Make rpc_read async
authorVolker Lendecke <vl@samba.org>
Thu, 15 Jan 2009 20:56:03 +0000 (21:56 +0100)
committerVolker Lendecke <vl@samba.org>
Sun, 18 Jan 2009 14:40:43 +0000 (15:40 +0100)
source3/rpc_client/cli_pipe.c

index 28bbfa57b6673310f777870b1490cc2504ff1028..1f7e33261227b7a9120633e29a8643e914a76fe3 100644 (file)
@@ -172,57 +172,6 @@ static uint32 get_rpc_call_id(void)
        return ++call_id;
 }
 
-/*******************************************************************
- Read from a RPC named pipe
- ********************************************************************/
-static NTSTATUS rpc_read_np(struct cli_state *cli, const char *pipe_name,
-                           int fnum, char *buf, size_t size,
-                           ssize_t *pnum_read)
-{
-       ssize_t num_read;
-
-       num_read = cli_read(cli, fnum, buf, 0, size);
-
-       DEBUG(5,("rpc_read_np: num_read = %d, to read: %u\n", (int)num_read,
-               (unsigned int)size));
-
-       /*
-       * A dos error of ERRDOS/ERRmoredata is not an error.
-       */
-       if (cli_is_dos_error(cli)) {
-              uint32 ecode;
-              uint8 eclass;
-              cli_dos_error(cli, &eclass, &ecode);
-              if (eclass != ERRDOS && ecode != ERRmoredata) {
-                      DEBUG(0,("rpc_read: DOS Error %d/%u (%s) in cli_read "
-                               "on fnum 0x%x\n", eclass, (unsigned int)ecode,
-                               cli_errstr(cli), fnum));
-                      return dos_to_ntstatus(eclass, ecode);
-              }
-       }
-
-       /*
-       * Likewise for NT_STATUS_BUFFER_TOO_SMALL
-       */
-       if (cli_is_nt_error(cli)) {
-              if (!NT_STATUS_EQUAL(cli_nt_error(cli),
-                                   NT_STATUS_BUFFER_TOO_SMALL)) {
-                      DEBUG(0,("rpc_read: Error (%s) in cli_read on fnum "
-                               "0x%x\n", nt_errstr(cli_nt_error(cli)), fnum));
-                      return cli_nt_error(cli);
-              }
-       }
-
-       if (num_read == -1) {
-              DEBUG(0,("rpc_read: Error - cli_read on fnum 0x%x returned "
-                       "-1\n", fnum));
-              return cli_get_nt_error(cli);
-       }
-
-       *pnum_read = num_read;
-       return NT_STATUS_OK;
-}
-
 /*
  * Realloc pdu to have a least "size" bytes
  */
@@ -254,53 +203,171 @@ static bool rpc_grow_buffer(prs_struct *pdu, size_t size)
  Reads the whole size or give an error message
  ********************************************************************/
 
-static NTSTATUS rpc_read(struct rpc_pipe_client *cli,
-                        char *pdata, size_t size)
+struct rpc_read_state {
+       struct event_context *ev;
+       struct rpc_pipe_client *cli;
+       char *data;
+       size_t size;
+       size_t num_read;
+};
+
+static void rpc_read_np_done(struct async_req *subreq);
+static void rpc_read_sock_done(struct async_req *subreq);
+
+static struct async_req *rpc_read_send(TALLOC_CTX *mem_ctx,
+                                      struct event_context *ev,
+                                      struct rpc_pipe_client *cli,
+                                      char *data, size_t size)
 {
-       ssize_t num_read = 0;
+       struct async_req *result, *subreq;
+       struct rpc_read_state *state;
 
-       DEBUG(5, ("rpc_read: data_to_read: %u\n", (unsigned int)size));
+       result = async_req_new(mem_ctx);
+       if (result == NULL) {
+               return NULL;
+       }
+       state = talloc(result, struct rpc_read_state);
+       if (state == NULL) {
+               goto fail;
+       }
+       result->private_data = state;
 
-       while (num_read < size) {
-               ssize_t thistime = 0;
-               NTSTATUS status;
+       state->ev = ev;
+       state->cli = cli;
+       state->data = data;
+       state->size = size;
+       state->num_read = 0;
 
-               switch (cli->transport_type) {
-               case NCACN_NP:
-                       status = rpc_read_np(cli->trans.np.cli,
-                                            cli->trans.np.pipe_name,
-                                            cli->trans.np.fnum,
-                                            pdata + num_read,
-                                            size - num_read, &thistime);
-                       break;
-               case NCACN_IP_TCP:
-               case NCACN_UNIX_STREAM:
-                       status = NT_STATUS_OK;
-                       thistime = sys_read(cli->trans.sock.fd,
-                                           pdata + num_read,
-                                           size - num_read);
-                       if (thistime == -1) {
-                               status = map_nt_error_from_unix(errno);
-                       }
-                       break;
-               default:
-                       DEBUG(0, ("unknown transport type %d\n",
-                                 cli->transport_type));
-                       return NT_STATUS_INTERNAL_ERROR;
-               }
+       DEBUG(5, ("rpc_read_send: data_to_read: %u\n", (unsigned int)size));
 
-               if (!NT_STATUS_IS_OK(status)) {
-                       return status;
+       if (cli->transport_type == NCACN_NP) {
+               subreq = cli_read_andx_send(
+                       state, ev, cli->trans.np.cli,
+                       cli->trans.np.fnum, 0, size);
+               if (subreq == NULL) {
+                       DEBUG(10, ("cli_read_andx_send failed\n"));
+                       goto fail;
                }
-               if (thistime == 0) {
-                       return NT_STATUS_END_OF_FILE;
+               subreq->async.fn = rpc_read_np_done;
+               subreq->async.priv = result;
+               return result;
+       }
+
+       if ((cli->transport_type == NCACN_IP_TCP)
+           || (cli->transport_type == NCACN_UNIX_STREAM)) {
+               subreq = recvall_send(state, ev, cli->trans.sock.fd,
+                                     data, size, 0);
+               if (subreq == NULL) {
+                       DEBUG(10, ("recvall_send failed\n"));
+                       goto fail;
                }
+               subreq->async.fn = rpc_read_sock_done;
+               subreq->async.priv = result;
+               return result;
+       }
 
-               num_read += thistime;
+       if (async_post_status(result, ev, NT_STATUS_INVALID_PARAMETER)) {
+               return result;
+       }
+ fail:
+       TALLOC_FREE(result);
+       return NULL;
+}
 
+static void rpc_read_np_done(struct async_req *subreq)
+{
+       struct async_req *req = talloc_get_type_abort(
+               subreq->async.priv, struct async_req);
+       struct rpc_read_state *state = talloc_get_type_abort(
+               req->private_data, struct rpc_read_state);
+       NTSTATUS status;
+       ssize_t received;
+       uint8_t *rcvbuf;
+
+       status = cli_read_andx_recv(subreq, &received, &rcvbuf);
+       /*
+        * We can't TALLOC_FREE(subreq) as usual here, as rcvbuf still is a
+        * child of that.
+        */
+       if (NT_STATUS_EQUAL(status, NT_STATUS_BUFFER_TOO_SMALL)) {
+               status = NT_STATUS_OK;
+       }
+       if (!NT_STATUS_IS_OK(status)) {
+               TALLOC_FREE(subreq);
+               async_req_error(req, status);
+               return;
        }
 
-       return NT_STATUS_OK;
+       memcpy(state->data + state->num_read, rcvbuf, received);
+       TALLOC_FREE(subreq);
+
+       state->num_read += received;
+
+       if (state->num_read == state->size) {
+               async_req_done(req);
+               return;
+       }
+
+       subreq = cli_read_andx_send(
+               state, state->ev, state->cli->trans.np.cli,
+               state->cli->trans.np.fnum, 0,
+               state->size - state->num_read);
+
+       if (async_req_nomem(subreq, req)) {
+               return;
+       }
+
+       subreq->async.fn = rpc_read_np_done;
+       subreq->async.priv = req;
+}
+
+static void rpc_read_sock_done(struct async_req *subreq)
+{
+       struct async_req *req = talloc_get_type_abort(
+               subreq->async.priv, struct async_req);
+       NTSTATUS status;
+
+       status = recvall_recv(subreq);
+       TALLOC_FREE(subreq);
+       if (!NT_STATUS_IS_OK(status)) {
+               async_req_error(req, status);
+               return;
+       }
+
+       async_req_done(req);
+}
+
+static NTSTATUS rpc_read_recv(struct async_req *req)
+{
+       return async_req_simple_recv(req);
+}
+
+static NTSTATUS rpc_read(struct rpc_pipe_client *cli,
+                        char *pdata, size_t size)
+{
+       TALLOC_CTX *frame = talloc_stackframe();
+       struct event_context *ev;
+       struct async_req *req;
+       NTSTATUS status = NT_STATUS_NO_MEMORY;
+
+       ev = event_context_init(frame);
+       if (ev == NULL) {
+               goto fail;
+       }
+
+       req = rpc_read_send(frame, ev, cli, pdata, size);
+       if (req == NULL) {
+               goto fail;
+       }
+
+       while (req->state < ASYNC_REQ_DONE) {
+               event_loop_once(ev);
+       }
+
+       status = rpc_read_recv(req);
+ fail:
+       TALLOC_FREE(frame);
+       return status;
 }
 
 /****************************************************************************