tsocket: add tstream_context infrastructure similar to tdgram_context
authorStefan Metzmacher <metze@samba.org>
Fri, 3 Apr 2009 16:08:10 +0000 (18:08 +0200)
committerStefan Metzmacher <metze@samba.org>
Fri, 1 May 2009 15:41:53 +0000 (17:41 +0200)
metze

lib/tsocket/tsocket.c
lib/tsocket/tsocket.h
lib/tsocket/tsocket_internal.h

index dbac6e26cf896eeb0e98a7a2c0f497a4e981e857..e618fb79b8318f520fb4d73a79959777393f01a0 100644 (file)
@@ -22,6 +22,7 @@
 */
 
 #include "replace.h"
+#include "system/filesys.h"
 #include "tsocket.h"
 #include "tsocket_internal.h"
 
@@ -431,3 +432,377 @@ int tdgram_disconnect_recv(struct tevent_req *req,
        return ret;
 }
 
+struct tstream_context {
+       const char *location;
+       const struct tstream_context_ops *ops;
+       void *private_data;
+
+       struct tevent_req *readv_req;
+       struct tevent_req *writev_req;
+};
+
+static int tstream_context_destructor(struct tstream_context *stream)
+{
+       if (stream->readv_req) {
+               tevent_req_received(stream->readv_req);
+       }
+
+       if (stream->writev_req) {
+               tevent_req_received(stream->writev_req);
+       }
+
+       return 0;
+}
+
+struct tstream_context *_tstream_context_create(TALLOC_CTX *mem_ctx,
+                                       const struct tstream_context_ops *ops,
+                                       void *pstate,
+                                       size_t psize,
+                                       const char *type,
+                                       const char *location)
+{
+       struct tstream_context *stream;
+       void **ppstate = (void **)pstate;
+       void *state;
+
+       stream = talloc(mem_ctx, struct tstream_context);
+       if (stream == NULL) {
+               return NULL;
+       }
+       stream->location        = location;
+       stream->ops             = ops;
+       stream->readv_req       = NULL;
+       stream->writev_req      = NULL;
+
+       state = talloc_size(stream, psize);
+       if (state == NULL) {
+               talloc_free(stream);
+               return NULL;
+       }
+       talloc_set_name_const(state, type);
+
+       stream->private_data = state;
+
+       talloc_set_destructor(stream, tstream_context_destructor);
+
+       *ppstate = state;
+       return stream;
+}
+
+void *_tstream_context_data(struct tstream_context *stream)
+{
+       return stream->private_data;
+}
+
+ssize_t tstream_pending_bytes(struct tstream_context *stream)
+{
+       return stream->ops->pending_bytes(stream);
+}
+
+struct tstream_readv_state {
+       const struct tstream_context_ops *ops;
+       struct tstream_context *stream;
+       int ret;
+};
+
+static int tstream_readv_destructor(struct tstream_readv_state *state)
+{
+       if (state->stream) {
+               state->stream->readv_req = NULL;
+       }
+
+       return 0;
+}
+
+static void tstream_readv_done(struct tevent_req *subreq);
+
+struct tevent_req *tstream_readv_send(TALLOC_CTX *mem_ctx,
+                                     struct tevent_context *ev,
+                                     struct tstream_context *stream,
+                                     struct iovec *vector,
+                                     size_t count)
+{
+       struct tevent_req *req;
+       struct tstream_readv_state *state;
+       struct tevent_req *subreq;
+       int to_read = 0;
+       size_t i;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct tstream_readv_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       state->ops = stream->ops;
+       state->stream = stream;
+       state->ret = -1;
+
+       /* first check if the input is ok */
+       if (count > IOV_MAX) {
+               tevent_req_error(req, EMSGSIZE);
+               goto post;
+       }
+
+       for (i=0; i < count; i++) {
+               int tmp = to_read;
+               tmp += vector[i].iov_len;
+
+               if (tmp < to_read) {
+                       tevent_req_error(req, EMSGSIZE);
+                       goto post;
+               }
+
+               to_read = tmp;
+       }
+
+       if (to_read == 0) {
+               tevent_req_error(req, EINVAL);
+               goto post;
+       }
+
+       if (stream->readv_req) {
+               tevent_req_error(req, EBUSY);
+               goto post;
+       }
+       stream->readv_req = req;
+
+       talloc_set_destructor(state, tstream_readv_destructor);
+
+       subreq = state->ops->readv_send(state, ev, stream, vector, count);
+       if (tevent_req_nomem(subreq, req)) {
+               goto post;
+       }
+       tevent_req_set_callback(subreq, tstream_readv_done, req);
+
+       return req;
+
+ post:
+       tevent_req_post(req, ev);
+       return req;
+}
+
+static void tstream_readv_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct tstream_readv_state *state = tevent_req_data(req,
+                                           struct tstream_readv_state);
+       ssize_t ret;
+       int sys_errno;
+
+       ret = state->ops->readv_recv(subreq, &sys_errno);
+       if (ret == -1) {
+               tevent_req_error(req, sys_errno);
+               return;
+       }
+
+       state->ret = ret;
+
+       tevent_req_done(req);
+}
+
+int tstream_readv_recv(struct tevent_req *req,
+                      int *perrno)
+{
+       struct tstream_readv_state *state = tevent_req_data(req,
+                                           struct tstream_readv_state);
+       int ret;
+
+       ret = tsocket_simple_int_recv(req, perrno);
+       if (ret == 0) {
+               ret = state->ret;
+       }
+
+       tevent_req_received(req);
+       return ret;
+}
+
+struct tstream_writev_state {
+       const struct tstream_context_ops *ops;
+       struct tstream_context *stream;
+       int ret;
+};
+
+static int tstream_writev_destructor(struct tstream_writev_state *state)
+{
+       if (state->stream) {
+               state->stream->writev_req = NULL;
+       }
+
+       return 0;
+}
+
+static void tstream_writev_done(struct tevent_req *subreq);
+
+struct tevent_req *tstream_writev_send(TALLOC_CTX *mem_ctx,
+                                      struct tevent_context *ev,
+                                      struct tstream_context *stream,
+                                      const struct iovec *vector,
+                                      size_t count)
+{
+       struct tevent_req *req;
+       struct tstream_writev_state *state;
+       struct tevent_req *subreq;
+       int to_write = 0;
+       size_t i;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct tstream_writev_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       state->ops = stream->ops;
+       state->stream = stream;
+       state->ret = -1;
+
+       /* first check if the input is ok */
+       if (count > IOV_MAX) {
+               tevent_req_error(req, EMSGSIZE);
+               goto post;
+       }
+
+       for (i=0; i < count; i++) {
+               int tmp = to_write;
+               tmp += vector[i].iov_len;
+
+               if (tmp < to_write) {
+                       tevent_req_error(req, EMSGSIZE);
+                       goto post;
+               }
+
+               to_write = tmp;
+       }
+
+       if (to_write == 0) {
+               tevent_req_error(req, EINVAL);
+               goto post;
+       }
+
+       if (stream->writev_req) {
+               tevent_req_error(req, EBUSY);
+               goto post;
+       }
+       stream->writev_req = req;
+
+       talloc_set_destructor(state, tstream_writev_destructor);
+
+       subreq = state->ops->writev_send(state, ev, stream, vector, count);
+       if (tevent_req_nomem(subreq, req)) {
+               goto post;
+       }
+       tevent_req_set_callback(subreq, tstream_writev_done, req);
+
+       return req;
+
+ post:
+       tevent_req_post(req, ev);
+       return req;
+}
+
+static void tstream_writev_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct tstream_writev_state *state = tevent_req_data(req,
+                                            struct tstream_writev_state);
+       ssize_t ret;
+       int sys_errno;
+
+       ret = state->ops->writev_recv(subreq, &sys_errno);
+       if (ret == -1) {
+               tevent_req_error(req, sys_errno);
+               return;
+       }
+
+       state->ret = ret;
+
+       tevent_req_done(req);
+}
+
+int tstream_writev_recv(struct tevent_req *req,
+                      int *perrno)
+{
+       struct tstream_writev_state *state = tevent_req_data(req,
+                                            struct tstream_writev_state);
+       int ret;
+
+       ret = tsocket_simple_int_recv(req, perrno);
+       if (ret == 0) {
+               ret = state->ret;
+       }
+
+       tevent_req_received(req);
+       return ret;
+}
+
+struct tstream_disconnect_state {
+       const struct tstream_context_ops *ops;
+};
+
+static void tstream_disconnect_done(struct tevent_req *subreq);
+
+struct tevent_req *tstream_disconnect_send(TALLOC_CTX *mem_ctx,
+                                          struct tevent_context *ev,
+                                          struct tstream_context *stream)
+{
+       struct tevent_req *req;
+       struct tstream_disconnect_state *state;
+       struct tevent_req *subreq;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct tstream_disconnect_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       state->ops = stream->ops;
+
+       if (stream->readv_req || stream->writev_req) {
+               tevent_req_error(req, EBUSY);
+               goto post;
+       }
+
+       subreq = state->ops->disconnect_send(state, ev, stream);
+       if (tevent_req_nomem(subreq, req)) {
+               goto post;
+       }
+       tevent_req_set_callback(subreq, tstream_disconnect_done, req);
+
+       return req;
+
+ post:
+       tevent_req_post(req, ev);
+       return req;
+}
+
+static void tstream_disconnect_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct tstream_disconnect_state *state = tevent_req_data(req,
+                                                struct tstream_disconnect_state);
+       int ret;
+       int sys_errno;
+
+       ret = state->ops->disconnect_recv(subreq, &sys_errno);
+       if (ret == -1) {
+               tevent_req_error(req, sys_errno);
+               return;
+       }
+
+       tevent_req_done(req);
+}
+
+int tstream_disconnect_recv(struct tevent_req *req,
+                          int *perrno)
+{
+       int ret;
+
+       ret = tsocket_simple_int_recv(req, perrno);
+
+       tevent_req_received(req);
+       return ret;
+}
+
index a4ae3fc32a26052511efc89d5d4e9a3807dce6cc..96fd6fe395eaebb313312fad1ab83798b9c36211 100644 (file)
@@ -29,6 +29,7 @@
 
 struct tsocket_address;
 struct tdgram_context;
+struct tstream_context;
 struct iovec;
 
 /*
@@ -70,6 +71,33 @@ struct tevent_req *tdgram_disconnect_send(TALLOC_CTX *mem_ctx,
 int tdgram_disconnect_recv(struct tevent_req *req,
                           int *perrno);
 
+/*
+ * tstream_context related functions
+ */
+ssize_t tstream_pending_bytes(struct tstream_context *stream);
+
+struct tevent_req *tstream_readv_send(TALLOC_CTX *mem_ctx,
+                                     struct tevent_context *ev,
+                                     struct tstream_context *stream,
+                                     struct iovec *vector,
+                                     size_t count);
+int tstream_readv_recv(struct tevent_req *req,
+                      int *perrno);
+
+struct tevent_req *tstream_writev_send(TALLOC_CTX *mem_ctx,
+                                      struct tevent_context *ev,
+                                      struct tstream_context *stream,
+                                      const struct iovec *vector,
+                                      size_t count);
+int tstream_writev_recv(struct tevent_req *req,
+                       int *perrno);
+
+struct tevent_req *tstream_disconnect_send(TALLOC_CTX *mem_ctx,
+                                          struct tevent_context *ev,
+                                          struct tstream_context *stream);
+int tstream_disconnect_recv(struct tevent_req *req,
+                           int *perrno);
+
 /*
  * BSD sockets: inet, inet6 and unix
  */
index a03dc9bde0ca66eb4bed044fefbee071e10de9ae..55bbe9f4cb76e22b2fdc39dcae165544fe2bc68b 100644 (file)
@@ -93,6 +93,48 @@ void *_tdgram_context_data(struct tdgram_context *dgram);
 #define tdgram_context_data(_req, _type) \
        talloc_get_type_abort(_tdgram_context_data(_req), _type)
 
+struct tstream_context_ops {
+       const char *name;
+
+       ssize_t (*pending_bytes)(struct tstream_context *stream);
+
+       struct tevent_req *(*readv_send)(TALLOC_CTX *mem_ctx,
+                                        struct tevent_context *ev,
+                                        struct tstream_context *stream,
+                                        struct iovec *vector,
+                                        size_t count);
+       int (*readv_recv)(struct tevent_req *req,
+                         int *perrno);
+
+       struct tevent_req *(*writev_send)(TALLOC_CTX *mem_ctx,
+                                         struct tevent_context *ev,
+                                         struct tstream_context *stream,
+                                         const struct iovec *vector,
+                                         size_t count);
+       int (*writev_recv)(struct tevent_req *req,
+                          int *perrno);
+
+       struct tevent_req *(*disconnect_send)(TALLOC_CTX *mem_ctx,
+                                             struct tevent_context *ev,
+                                             struct tstream_context *stream);
+       int (*disconnect_recv)(struct tevent_req *req,
+                              int *perrno);
+};
+
+struct tstream_context *_tstream_context_create(TALLOC_CTX *mem_ctx,
+                                       const struct tstream_context_ops *ops,
+                                       void *pstate,
+                                       size_t psize,
+                                       const char *type,
+                                       const char *location);
+#define tstream_context_create(mem_ctx, ops, state, type, location) \
+       _tstream_context_create(mem_ctx, ops, state, sizeof(type), \
+                               #type, location)
+
+void *_tstream_context_data(struct tstream_context *stream);
+#define tstream_context_data(_req, _type) \
+       talloc_get_type_abort(_tstream_context_data(_req), _type)
+
 int tsocket_simple_int_recv(struct tevent_req *req, int *perrno);
 
 #endif /* _TSOCKET_H */