Add async read_packet
authorVolker Lendecke <vl@samba.org>
Sun, 22 Feb 2009 22:13:34 +0000 (23:13 +0100)
committerVolker Lendecke <vl@samba.org>
Tue, 24 Feb 2009 19:40:47 +0000 (20:40 +0100)
lib/async_req/async_sock.c
lib/async_req/async_sock.h

index 67776ff67fc7ee31e92fe3eec5851597f70c5b3c..02e4c9eb4bf5a33ee703ad7a7beb4c8d7c34af9a 100644 (file)
@@ -765,3 +765,121 @@ ssize_t writev_recv(struct tevent_req *req, int *perrno)
        }
        return state->total_size;
 }
+
+struct read_packet_state {
+       int fd;
+       uint8_t *buf;
+       size_t nread;
+       ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
+       void *private_data;
+};
+
+static void read_packet_handler(struct tevent_context *ev,
+                               struct tevent_fd *fde,
+                               uint16_t flags, void *private_data);
+
+struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
+                                   struct tevent_context *ev,
+                                   int fd, size_t initial,
+                                   ssize_t (*more)(uint8_t *buf,
+                                                   size_t buflen,
+                                                   void *private_data),
+                                   void *private_data)
+{
+       struct tevent_req *result;
+       struct read_packet_state *state;
+       struct tevent_fd *fde;
+
+       result = tevent_req_create(mem_ctx, &state, struct read_packet_state);
+       if (result == NULL) {
+               return NULL;
+       }
+       state->fd = fd;
+       state->nread = 0;
+       state->more = more;
+       state->private_data = private_data;
+
+       state->buf = talloc_array(state, uint8_t, initial);
+       if (state->buf == NULL) {
+               goto fail;
+       }
+
+       fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, read_packet_handler,
+                           result);
+       if (fde == NULL) {
+               goto fail;
+       }
+       return result;
+ fail:
+       TALLOC_FREE(result);
+       return NULL;
+}
+
+static void read_packet_handler(struct tevent_context *ev,
+                               struct tevent_fd *fde,
+                               uint16_t flags, void *private_data)
+{
+       struct tevent_req *req = talloc_get_type_abort(
+               private_data, struct tevent_req);
+       struct read_packet_state *state = talloc_get_type_abort(
+               req->private_state, struct read_packet_state);
+       size_t total = talloc_get_size(state->buf);
+       ssize_t nread, more;
+       uint8_t *tmp;
+
+       nread = read(state->fd, state->buf+state->nread, total-state->nread);
+       if (nread == -1) {
+               tevent_req_error(req, errno);
+               return;
+       }
+       if (nread == 0) {
+               tevent_req_error(req, EPIPE);
+               return;
+       }
+
+       state->nread += nread;
+       if (state->nread < total) {
+               /* Come back later */
+               return;
+       }
+
+       /*
+        * We got what was initially requested. See if "more" asks for -- more.
+        */
+       if (state->more == NULL) {
+               /* Nobody to ask, this is a async read_data */
+               tevent_req_done(req);
+               return;
+       }
+
+       more = state->more(state->buf, total, state->private_data);
+       if (more == -1) {
+               /* We got an invalid packet, tell the caller */
+               tevent_req_error(req, EIO);
+               return;
+       }
+       if (more == 0) {
+               /* We're done, full packet received */
+               tevent_req_done(req);
+               return;
+       }
+
+       tmp = TALLOC_REALLOC_ARRAY(state, state->buf, uint8_t, total+more);
+       if (tevent_req_nomem(tmp, req)) {
+               return;
+       }
+       state->buf = tmp;
+}
+
+ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+                        uint8_t **pbuf, int *perrno)
+{
+       struct read_packet_state *state = talloc_get_type_abort(
+               req->private_state, struct read_packet_state);
+
+       if (tevent_req_is_unix_error(req, perrno)) {
+               return -1;
+       }
+       *pbuf = talloc_move(mem_ctx, &state->buf);
+       return talloc_get_size(*pbuf);
+}
index 6a862c45c6cef4b6f0cf6e99c8d4aa9bbd27da0b..0cf4e4ecf5a78bfe20ed1950792b9dc2a3399b32 100644 (file)
@@ -51,4 +51,14 @@ struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
                               int fd, struct iovec *iov, int count);
 ssize_t writev_recv(struct tevent_req *req, int *perrno);
 
+struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
+                                   struct tevent_context *ev,
+                                   int fd, size_t initial,
+                                   ssize_t (*more)(uint8_t *buf,
+                                                   size_t buflen,
+                                                   void *private_data),
+                                   void *private_data);
+ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+                        uint8_t **pbuf, int *perrno);
+
 #endif