Make rpc_api_pipe async
authorVolker Lendecke <vl@samba.org>
Sat, 17 Jan 2009 12:33:34 +0000 (13:33 +0100)
committerVolker Lendecke <vl@samba.org>
Sun, 18 Jan 2009 14:40:44 +0000 (15:40 +0100)
source3/rpc_client/cli_pipe.c

index 4c0cb78a04c04ea18bcec86aa7d2ee8f4afa1907..c924436faa45623107629029d6f379c003ebfb6e 100644 (file)
@@ -1265,6 +1265,8 @@ static NTSTATUS cli_api_pipe(TALLOC_CTX *mem_ctx, struct rpc_pipe_client *cli,
 
  ****************************************************************************/
 
+#if 0
+
 static NTSTATUS rpc_api_pipe(TALLOC_CTX *mem_ctx, struct rpc_pipe_client *cli,
                        prs_struct *data, /* Outgoing pdu fragment, already formatted for send. */
                        prs_struct *rbuf, /* Incoming reply - return as an NDR stream. */
@@ -1398,6 +1400,288 @@ static NTSTATUS rpc_api_pipe(TALLOC_CTX *mem_ctx, struct rpc_pipe_client *cli,
        prs_mem_free(rbuf);
        return ret;
 }
+#endif
+
+struct rpc_api_pipe_state {
+       struct event_context *ev;
+       struct rpc_pipe_client *cli;
+       uint8_t expected_pkt_type;
+
+       prs_struct incoming_frag;
+       struct rpc_hdr_info rhdr;
+
+       prs_struct incoming_pdu;        /* Incoming reply */
+       uint32_t incoming_pdu_offset;
+};
+
+static int rpc_api_pipe_state_destructor(struct rpc_api_pipe_state *state)
+{
+       prs_mem_free(&state->incoming_frag);
+       prs_mem_free(&state->incoming_pdu);
+       return 0;
+}
+
+static void rpc_api_pipe_trans_done(struct async_req *subreq);
+static void rpc_api_pipe_got_pdu(struct async_req *subreq);
+
+static struct async_req *rpc_api_pipe_send(TALLOC_CTX *mem_ctx,
+                                          struct event_context *ev,
+                                          struct rpc_pipe_client *cli,
+                                          prs_struct *data, /* Outgoing PDU */
+                                          uint8_t expected_pkt_type)
+{
+       struct async_req *result, *subreq;
+       struct rpc_api_pipe_state *state;
+       NTSTATUS status;
+
+       result = async_req_new(mem_ctx);
+       if (result == NULL) {
+               return NULL;
+       }
+       state = talloc(result, struct rpc_api_pipe_state);
+       if (state == NULL) {
+               goto fail;
+       }
+       result->private_data = state;
+
+       state->ev = ev;
+       state->cli = cli;
+       state->expected_pkt_type = expected_pkt_type;
+       state->incoming_pdu_offset = 0;
+
+       prs_init_empty(&state->incoming_frag, state, UNMARSHALL);
+
+       prs_init_empty(&state->incoming_pdu, state, UNMARSHALL);
+       /* Make incoming_pdu dynamic with no memory. */
+       prs_give_memory(&state->incoming_pdu, 0, 0, true);
+
+       talloc_set_destructor(state, rpc_api_pipe_state_destructor);
+
+       /*
+        * Ensure we're not sending too much.
+        */
+       if (prs_offset(data) > cli->max_xmit_frag) {
+               status = NT_STATUS_INVALID_PARAMETER;
+               goto post_status;
+       }
+
+       DEBUG(5,("rpc_api_pipe: %s\n", rpccli_pipe_txt(debug_ctx(), cli)));
+
+       subreq = cli_api_pipe_send(state, ev, cli,
+                                  (uint8_t *)prs_data_p(data),
+                                  prs_offset(data), cli->max_recv_frag);
+       if (subreq == NULL) {
+               status = NT_STATUS_NO_MEMORY;
+               goto post_status;
+       }
+       subreq->async.fn = rpc_api_pipe_trans_done;
+       subreq->async.priv = result;
+       return result;
+
+ post_status:
+       if (async_post_status(result, ev, status)) {
+               return result;
+       }
+ fail:
+       TALLOC_FREE(result);
+       return NULL;
+}
+
+static void rpc_api_pipe_trans_done(struct async_req *subreq)
+{
+       struct async_req *req = talloc_get_type_abort(
+               subreq->async.priv, struct async_req);
+       struct rpc_api_pipe_state *state = talloc_get_type_abort(
+               req->private_data, struct rpc_api_pipe_state);
+       NTSTATUS status;
+       uint8_t *rdata = NULL;
+       uint32_t rdata_len = 0;
+       char *rdata_copy;
+
+       status = cli_api_pipe_recv(subreq, state, &rdata, &rdata_len);
+       TALLOC_FREE(subreq);
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(5, ("cli_api_pipe failed: %s\n", nt_errstr(status)));
+               async_req_error(req, status);
+               return;
+       }
+
+       if (rdata == NULL) {
+               DEBUG(3,("rpc_api_pipe: %s failed to return data.\n",
+                        rpccli_pipe_txt(debug_ctx(), state->cli)));
+               async_req_done(req);
+               return;
+       }
+
+       /*
+        * Give the memory received from cli_trans as dynamic to the current
+        * pdu. Duplicating it sucks, but prs_struct doesn't know about talloc
+        * :-(
+        */
+       rdata_copy = (char *)memdup(rdata, rdata_len);
+       TALLOC_FREE(rdata);
+       if (async_req_nomem(rdata_copy, req)) {
+               return;
+       }
+       prs_give_memory(&state->incoming_frag, rdata_copy, rdata_len, true);
+
+       /* Ensure we have enough data for a pdu. */
+       subreq = get_complete_frag_send(state, state->ev, state->cli,
+                                       &state->rhdr, &state->incoming_frag);
+       if (async_req_nomem(subreq, req)) {
+               return;
+       }
+       subreq->async.fn = rpc_api_pipe_got_pdu;
+       subreq->async.priv = req;
+}
+
+static void rpc_api_pipe_got_pdu(struct async_req *subreq)
+{
+       struct async_req *req = talloc_get_type_abort(
+               subreq->async.priv, struct async_req);
+       struct rpc_api_pipe_state *state = talloc_get_type_abort(
+               req->private_data, struct rpc_api_pipe_state);
+       NTSTATUS status;
+       char *rdata = NULL;
+       uint32_t rdata_len = 0;
+
+       status = get_complete_frag_recv(subreq);
+       TALLOC_FREE(subreq);
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(5, ("get_complete_frag failed: %s\n",
+                         nt_errstr(status)));
+               async_req_error(req, status);
+               return;
+       }
+
+       status = cli_pipe_validate_current_pdu(
+               state->cli, &state->rhdr, &state->incoming_frag,
+               state->expected_pkt_type, &rdata, &rdata_len,
+               &state->incoming_pdu);
+
+       DEBUG(10,("rpc_api_pipe: got frag len of %u at offset %u: %s\n",
+                 (unsigned)prs_data_size(&state->incoming_frag),
+                 (unsigned)state->incoming_pdu_offset,
+                 nt_errstr(status)));
+
+       if (!NT_STATUS_IS_OK(status)) {
+               async_req_error(req, status);
+               return;
+       }
+
+       if ((state->rhdr.flags & RPC_FLG_FIRST)
+           && (state->rhdr.pack_type[0] == 0)) {
+               /*
+                * Set the data type correctly for big-endian data on the
+                * first packet.
+                */
+               DEBUG(10,("rpc_api_pipe: On %s PDU data format is "
+                         "big-endian.\n",
+                         rpccli_pipe_txt(debug_ctx(), state->cli)));
+               prs_set_endian_data(&state->incoming_pdu, RPC_BIG_ENDIAN);
+       }
+       /*
+        * Check endianness on subsequent packets.
+        */
+       if (state->incoming_frag.bigendian_data
+           != state->incoming_pdu.bigendian_data) {
+               DEBUG(0,("rpc_api_pipe: Error : Endianness changed from %s to "
+                        "%s\n",
+                        state->incoming_pdu.bigendian_data?"big":"little",
+                        state->incoming_frag.bigendian_data?"big":"little"));
+               async_req_error(req, NT_STATUS_INVALID_PARAMETER);
+               return;
+       }
+
+       /* Now copy the data portion out of the pdu into rbuf. */
+       if (!prs_force_grow(&state->incoming_pdu, rdata_len)) {
+               async_req_error(req, NT_STATUS_NO_MEMORY);
+               return;
+       }
+
+       memcpy(prs_data_p(&state->incoming_pdu) + state->incoming_pdu_offset,
+              rdata, (size_t)rdata_len);
+       state->incoming_pdu_offset += rdata_len;
+
+       status = cli_pipe_reset_current_pdu(state->cli, &state->rhdr,
+                                           &state->incoming_frag);
+       if (!NT_STATUS_IS_OK(status)) {
+               async_req_error(req, status);
+               return;
+       }
+
+       if (state->rhdr.flags & RPC_FLG_LAST) {
+               DEBUG(10,("rpc_api_pipe: %s returned %u bytes.\n",
+                         rpccli_pipe_txt(debug_ctx(), state->cli),
+                         (unsigned)prs_data_size(&state->incoming_pdu)));
+               async_req_done(req);
+               return;
+       }
+
+       subreq = get_complete_frag_send(state, state->ev, state->cli,
+                                       &state->rhdr, &state->incoming_frag);
+       if (async_req_nomem(subreq, req)) {
+               return;
+       }
+       subreq->async.fn = rpc_api_pipe_got_pdu;
+       subreq->async.priv = req;
+}
+
+static NTSTATUS rpc_api_pipe_recv(struct async_req *req, TALLOC_CTX *mem_ctx,
+                                 prs_struct *reply_pdu)
+{
+       struct rpc_api_pipe_state *state = talloc_get_type_abort(
+               req->private_data, struct rpc_api_pipe_state);
+       NTSTATUS status;
+
+       if (async_req_is_error(req, &status)) {
+               return status;
+       }
+
+       *reply_pdu = state->incoming_pdu;
+       reply_pdu->mem_ctx = mem_ctx;
+
+       /*
+        * Prevent state->incoming_pdu from being freed in
+        * rpc_api_pipe_state_destructor()
+        */
+       prs_init_empty(&state->incoming_pdu, state, UNMARSHALL);
+
+       return NT_STATUS_OK;
+}
+
+static NTSTATUS rpc_api_pipe(TALLOC_CTX *mem_ctx, struct rpc_pipe_client *cli,
+                            prs_struct *data, /* Outgoing pdu fragment,
+                                               * already formatted for
+                                               * send. */
+                            prs_struct *rbuf, /* Incoming reply - return as
+                                               * an NDR stream. */
+                            uint8 expected_pkt_type)
+{
+       TALLOC_CTX *frame = talloc_stackframe();
+       struct event_context *ev;
+       struct async_req *req;
+       NTSTATUS status = NT_STATUS_NO_MEMORY;
+
+       ev = event_context_init(frame);
+       if (ev == NULL) {
+               goto fail;
+       }
+
+       req = rpc_api_pipe_send(frame, ev, cli, data, expected_pkt_type);
+       if (req == NULL) {
+               goto fail;
+       }
+
+       while (req->state < ASYNC_REQ_DONE) {
+               event_loop_once(ev);
+       }
+
+       status = rpc_api_pipe_recv(req, mem_ctx, rbuf);
+ fail:
+       TALLOC_FREE(frame);
+       return status;
+}
 
 /*******************************************************************
  Creates krb5 auth bind.