s4:librpc: use tstream in dcerpc_sock.c
authorGregor Beck <gbeck@sernet.de>
Wed, 4 Sep 2013 11:16:05 +0000 (13:16 +0200)
committerStefan Metzmacher <metze@samba.org>
Tue, 7 Jan 2014 07:37:43 +0000 (08:37 +0100)
Pair-Programmed-With: Stefan Metzmacher <metze@samba.org>

Signed-off-by: Gregor Beck <gbeck@sernet.de>
Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andreas Schneider <asn@samba.org>
source4/librpc/rpc/dcerpc_sock.c

index 4a62b2f497f04630acf861c976984e8c4cb1af23..928e7f7818706d2a9aa3643cde1364641fe3bac9 100644 (file)
 */
 
 #include "includes.h"
+#include "system/filesys.h"
 #include "lib/events/events.h"
 #include "lib/socket/socket.h"
-#include "lib/stream/packet.h"
+#include "lib/tsocket/tsocket.h"
 #include "libcli/composite/composite.h"
 #include "librpc/rpc/dcerpc.h"
 #include "librpc/rpc/dcerpc_proto.h"
 
 /* transport private information used by general socket pipe transports */
 struct sock_private {
-       struct tevent_fd *fde;
-       struct socket_context *sock;
        char *server_name;
 
-       struct packet_context *packet;
-       uint32_t pending_reads;
-
        const char *path; /* For ncacn_unix_sock and ncalrpc */
+
+       struct socket_address *peer_addr;
+
+       struct tstream_context *stream;
+       struct tevent_queue *write_queue;
+       struct tevent_req *read_subreq;
+       uint32_t pending_reads;
 };
 
 
@@ -49,25 +52,14 @@ struct sock_private {
 */
 static void sock_dead(struct dcecli_connection *p, NTSTATUS status)
 {
-       struct sock_private *sock = (struct sock_private *)p->transport.private_data;
+       struct sock_private *sock = talloc_get_type_abort(
+               p->transport.private_data, struct sock_private);
 
        if (!sock) return;
 
-       if (sock->packet) {
-               packet_recv_disable(sock->packet);
-               packet_set_fde(sock->packet, NULL);
-               packet_set_socket(sock->packet, NULL);
-       }
-
-       if (sock->fde) {
-               talloc_free(sock->fde);
-               sock->fde = NULL;
-       }
-
-       if (sock->sock) {
-               talloc_free(sock->sock);
-               sock->sock = NULL;
-       }
+       tevent_queue_stop(sock->write_queue);
+       TALLOC_FREE(sock->read_subreq);
+       TALLOC_FREE(sock->stream);
 
        if (NT_STATUS_EQUAL(NT_STATUS_UNSUCCESSFUL, status)) {
                status = NT_STATUS_UNEXPECTED_NETWORK_ERROR;
@@ -82,114 +74,146 @@ static void sock_dead(struct dcecli_connection *p, NTSTATUS status)
        }
 }
 
-
 /*
-  handle socket recv errors
+   initiate a read request - not needed for dcerpc sockets
 */
-static void sock_error_handler(void *private_data, NTSTATUS status)
+struct sock_send_read_state {
+       struct dcecli_connection *p;
+};
+
+static int sock_send_read_state_destructor(struct sock_send_read_state *state)
 {
-       struct dcecli_connection *p = talloc_get_type(private_data,
-                                                     struct dcecli_connection);
-       sock_dead(p, status);
+       struct dcecli_connection *p = state->p;
+       struct sock_private *sock = talloc_get_type_abort(
+               p->transport.private_data, struct sock_private);
+
+       sock->read_subreq = NULL;
+
+       return 0;
 }
 
-/*
-  check if a blob is a complete packet
-*/
-static NTSTATUS sock_complete_packet(void *private_data, DATA_BLOB blob, size_t *size)
+static void sock_send_read_done(struct tevent_req *subreq);
+
+static NTSTATUS sock_send_read(struct dcecli_connection *p)
 {
-       if (blob.length < DCERPC_FRAG_LEN_OFFSET+2) {
-               return STATUS_MORE_ENTRIES;
-       }
-       *size = dcerpc_get_frag_length(&blob);
-       if (*size < blob.length) {
-               /*
-                * something is wrong, let the caller deal with it
-                */
-               *size = blob.length;
+       struct sock_private *sock = talloc_get_type_abort(
+               p->transport.private_data, struct sock_private);
+       struct sock_send_read_state *state;
+
+       if (sock->read_subreq != NULL) {
+               sock->pending_reads++;
+               return NT_STATUS_OK;
        }
-       if (*size > blob.length) {
-               return STATUS_MORE_ENTRIES;
+
+       state = talloc_zero(sock, struct sock_send_read_state);
+       if (state == NULL) {
+               return NT_STATUS_NO_MEMORY;
        }
-       return NT_STATUS_OK;
-}
+       state->p = p;
 
-/*
-  process recv requests
-*/
-static NTSTATUS sock_process_recv(void *private_data, DATA_BLOB blob)
-{
-       struct dcecli_connection *p = talloc_get_type(private_data,
-                                                     struct dcecli_connection);
-       struct sock_private *sock = (struct sock_private *)p->transport.private_data;
-       sock->pending_reads--;
-       if (sock->pending_reads == 0) {
-               packet_recv_disable(sock->packet);
+       talloc_set_destructor(state, sock_send_read_state_destructor);
+
+       sock->read_subreq = dcerpc_read_ncacn_packet_send(state,
+                                                         p->event_ctx,
+                                                         sock->stream);
+       if (sock->read_subreq == NULL) {
+               return NT_STATUS_NO_MEMORY;
        }
-       p->transport.recv_data(p, &blob, NT_STATUS_OK);
+       tevent_req_set_callback(sock->read_subreq, sock_send_read_done, state);
+
        return NT_STATUS_OK;
 }
 
-/*
-  called when a IO is triggered by the events system
-*/
-static void sock_io_handler(struct tevent_context *ev, struct tevent_fd *fde, 
-                           uint16_t flags, void *private_data)
+static void sock_send_read_done(struct tevent_req *subreq)
 {
-       struct dcecli_connection *p = talloc_get_type(private_data,
-                                                     struct dcecli_connection);
-       struct sock_private *sock = (struct sock_private *)p->transport.private_data;
-
-       if (flags & TEVENT_FD_WRITE) {
-               packet_queue_run(sock->packet);
-               return;
-       }
+       struct sock_send_read_state *state =
+               tevent_req_callback_data(subreq,
+                                        struct sock_send_read_state);
+       struct dcecli_connection *p = state->p;
+       struct sock_private *sock = talloc_get_type_abort(
+               p->transport.private_data, struct sock_private);
+       NTSTATUS status;
+       struct ncacn_packet *pkt;
+       DATA_BLOB blob;
 
-       if (sock->sock == NULL) {
+       status = dcerpc_read_ncacn_packet_recv(subreq, state,
+                                              &pkt, &blob);
+       TALLOC_FREE(subreq);
+       if (!NT_STATUS_IS_OK(status)) {
+               TALLOC_FREE(state);
+               sock_dead(p, status);
                return;
        }
 
-       if (flags & TEVENT_FD_READ) {
-               packet_recv(sock->packet);
+       /*
+        * here we steal into thet connection context,
+        * but p->transport.recv_data() will steal or free it again
+        */
+       talloc_steal(p, blob.data);
+       TALLOC_FREE(state);
+
+       if (sock->pending_reads > 0) {
+               sock->pending_reads--;
+
+               status = sock_send_read(p);
+               if (!NT_STATUS_IS_OK(status)) {
+                       sock_dead(p, status);
+                       return;
+               }
        }
-}
 
-/* 
-   initiate a read request - not needed for dcerpc sockets
-*/
-static NTSTATUS sock_send_read(struct dcecli_connection *p)
-{
-       struct sock_private *sock = (struct sock_private *)p->transport.private_data;
-       sock->pending_reads++;
-       if (sock->pending_reads == 1) {
-               packet_recv_enable(sock->packet);
+       if (p->transport.recv_data) {
+               p->transport.recv_data(p, &blob, NT_STATUS_OK);
        }
-       return NT_STATUS_OK;
 }
 
 /* 
    send an initial pdu in a multi-pdu sequence
 */
+
+struct sock_send_request_state {
+       struct dcecli_connection *p;
+       DATA_BLOB blob;
+       struct iovec iov;
+};
+
+static void sock_send_request_done(struct tevent_req *subreq);
+
 static NTSTATUS sock_send_request(struct dcecli_connection *p, DATA_BLOB *data, 
                                  bool trigger_read)
 {
-       struct sock_private *sock = (struct sock_private *)p->transport.private_data;
-       DATA_BLOB blob;
-       NTSTATUS status;
+       struct sock_private *sock = talloc_get_type_abort(
+               p->transport.private_data, struct sock_private);
+       struct sock_send_request_state *state;
+       struct tevent_req *subreq;
 
-       if (sock->sock == NULL) {
+       if (sock->stream == NULL) {
                return NT_STATUS_CONNECTION_DISCONNECTED;
        }
 
-       blob = data_blob_talloc(sock->packet, data->data, data->length);
-       if (blob.data == NULL) {
+       state = talloc_zero(sock, struct sock_send_request_state);
+       if (state == NULL) {
                return NT_STATUS_NO_MEMORY;
        }
+       state->p = p;
 
-       status = packet_send(sock->packet, blob);
-       if (!NT_STATUS_IS_OK(status)) {
-               return status;
+       state->blob = data_blob_talloc(state, data->data, data->length);
+       if (state->blob.data == NULL) {
+               TALLOC_FREE(state);
+               return NT_STATUS_NO_MEMORY;
        }
+       state->iov.iov_base = (void *)state->blob.data;
+       state->iov.iov_len = state->blob.length;
+
+       subreq = tstream_writev_queue_send(state, p->event_ctx,
+                                          sock->stream,
+                                          sock->write_queue,
+                                          &state->iov, 1);
+       if (subreq == NULL) {
+               TALLOC_FREE(state);
+               return NT_STATUS_NO_MEMORY;
+       }
+       tevent_req_set_callback(subreq, sock_send_request_done, state);
 
        if (trigger_read) {
                sock_send_read(p);
@@ -198,14 +222,37 @@ static NTSTATUS sock_send_request(struct dcecli_connection *p, DATA_BLOB *data,
        return NT_STATUS_OK;
 }
 
+static void sock_send_request_done(struct tevent_req *subreq)
+{
+       struct sock_send_request_state *state =
+               tevent_req_callback_data(subreq,
+               struct sock_send_request_state);
+       int ret;
+       int error;
+
+       ret = tstream_writev_queue_recv(subreq, &error);
+       TALLOC_FREE(subreq);
+       if (ret == -1) {
+               struct dcecli_connection *p = state->p;
+               NTSTATUS status = map_nt_error_from_unix_common(error);
+
+               TALLOC_FREE(state);
+               sock_dead(p, status);
+               return;
+       }
+
+       TALLOC_FREE(state);
+}
+
 /* 
    shutdown sock pipe connection
 */
 static NTSTATUS sock_shutdown_pipe(struct dcecli_connection *p, NTSTATUS status)
 {
-       struct sock_private *sock = (struct sock_private *)p->transport.private_data;
+       struct sock_private *sock = talloc_get_type_abort(
+               p->transport.private_data, struct sock_private);
 
-       if (sock && sock->sock) {
+       if (sock && sock->stream) {
                sock_dead(p, status);
        }
 
@@ -217,7 +264,8 @@ static NTSTATUS sock_shutdown_pipe(struct dcecli_connection *p, NTSTATUS status)
 */
 static const char *sock_peer_name(struct dcecli_connection *p)
 {
-       struct sock_private *sock = talloc_get_type(p->transport.private_data, struct sock_private);
+       struct sock_private *sock = talloc_get_type_abort(
+               p->transport.private_data, struct sock_private);
        return sock->server_name;
 }
 
@@ -226,7 +274,8 @@ static const char *sock_peer_name(struct dcecli_connection *p)
 */
 static const char *sock_target_hostname(struct dcecli_connection *p)
 {
-       struct sock_private *sock = talloc_get_type(p->transport.private_data, struct sock_private);
+       struct sock_private *sock = talloc_get_type_abort(
+               p->transport.private_data, struct sock_private);
        return sock->server_name;
 }
 
@@ -246,10 +295,12 @@ static void continue_socket_connect(struct composite_context *ctx)
 {
        struct dcecli_connection *conn;
        struct sock_private *sock;
-       struct composite_context *c = talloc_get_type(ctx->async.private_data,
-                                                     struct composite_context);
-       struct pipe_open_socket_state *s = talloc_get_type(c->private_data,
-                                                          struct pipe_open_socket_state);
+       struct composite_context *c = talloc_get_type_abort(
+               ctx->async.private_data, struct composite_context);
+       struct pipe_open_socket_state *s = talloc_get_type_abort(
+               c->private_data, struct pipe_open_socket_state);
+       int rc;
+       int sock_fd;
 
        /* make it easier to write a function calls */
        conn = s->conn;
@@ -264,6 +315,16 @@ static void continue_socket_connect(struct composite_context *ctx)
                return;
        }
 
+       sock_fd = socket_get_fd(s->socket_ctx);
+       sock->peer_addr = socket_get_peer_addr(s->socket_ctx, sock);
+       if (sock->peer_addr == NULL) {
+               talloc_free(sock);
+               composite_error(c, NT_STATUS_NO_MEMORY);
+               return;
+       }
+       socket_set_flags(s->socket_ctx, SOCKET_FLAG_NOCLOSE);
+       TALLOC_FREE(s->socket_ctx);
+
        /*
          fill in the transport methods
        */
@@ -278,31 +339,33 @@ static void continue_socket_connect(struct composite_context *ctx)
        conn->transport.peer_name       = sock_peer_name;
        conn->transport.target_hostname = sock_target_hostname;
 
-       sock->sock          = s->socket_ctx;
+       /*
+        * Windows uses 5840 for ncacn_ip_tcp,
+        * so we also use it (for every transport which uses bsd sockets)
+        */
+       conn->srv_max_xmit_frag = 5840;
+       conn->srv_max_recv_frag = 5840;
+
        sock->pending_reads = 0;
        sock->server_name   = strupper_talloc(sock, s->target_hostname);
 
-       sock->fde = tevent_add_fd(conn->event_ctx, sock->sock, socket_get_fd(sock->sock),
-                                TEVENT_FD_READ, sock_io_handler, conn);
-       
        conn->transport.private_data = sock;
 
-       sock->packet = packet_init(sock);
-       if (sock->packet == NULL) {
+       rc = tstream_bsd_existing_socket(sock, sock_fd,
+                                        &sock->stream);
+       if (rc < 0) {
                talloc_free(sock);
                composite_error(c, NT_STATUS_NO_MEMORY);
                return;
        }
 
-       packet_set_private(sock->packet, conn);
-       packet_set_socket(sock->packet, sock->sock);
-       packet_set_callback(sock->packet, sock_process_recv);
-       packet_set_full_request(sock->packet, sock_complete_packet);
-       packet_set_error_handler(sock->packet, sock_error_handler);
-       packet_set_event_context(sock->packet, conn->event_ctx);
-       packet_set_fde(sock->packet, sock->fde);
-       packet_set_serialise(sock->packet);
-       packet_set_initial_read(sock->packet, 16);
+       sock->write_queue = tevent_queue_create(sock,
+                                               "dcerpc sock write queue");
+       if (sock->write_queue == NULL) {
+               talloc_free(sock);
+               composite_error(c, NT_STATUS_NO_MEMORY);
+               return;
+       }
 
        /* ensure we don't get SIGPIPE */
        BlockSignals(true, SIGPIPE);
@@ -343,7 +406,7 @@ static struct composite_context *dcerpc_pipe_open_socket_send(TALLOC_CTX *mem_ct
                if (composite_nomem(s->target_hostname, c)) return c;
        }
 
-       s->sock = talloc(cn, struct sock_private);
+       s->sock = talloc_zero(cn, struct sock_private);
        if (composite_nomem(s->sock, c)) return c;
 
        c->status = socket_create(server->family, SOCKET_TYPE_STREAM, &s->socket_ctx, 0);
@@ -389,10 +452,10 @@ static void continue_ip_resolve_name(struct composite_context *ctx);
 
 static void continue_ip_resolve_name(struct composite_context *ctx)
 {
-       struct composite_context *c = talloc_get_type(ctx->async.private_data,
-                                                     struct composite_context);
-       struct pipe_tcp_state *s = talloc_get_type(c->private_data,
-                                                  struct pipe_tcp_state);
+       struct composite_context *c = talloc_get_type_abort(
+               ctx->async.private_data, struct composite_context);
+       struct pipe_tcp_state *s = talloc_get_type_abort(
+               c->private_data, struct pipe_tcp_state);
        struct composite_context *sock_ip_req;
 
        c->status = resolve_name_multiple_recv(ctx, s, &s->addresses);
@@ -417,10 +480,10 @@ static void continue_ip_resolve_name(struct composite_context *ctx)
 */
 static void continue_ip_open_socket(struct composite_context *ctx)
 {
-       struct composite_context *c = talloc_get_type(ctx->async.private_data,
-                                                     struct composite_context);
-       struct pipe_tcp_state *s = talloc_get_type(c->private_data,
-                                                  struct pipe_tcp_state);
+       struct composite_context *c = talloc_get_type_abort(
+               ctx->async.private_data, struct composite_context);
+       struct pipe_tcp_state *s = talloc_get_type_abort(
+               c->private_data, struct pipe_tcp_state);
        
        /* receive result socket open request */
        c->status = dcerpc_pipe_open_socket_recv(ctx);
@@ -526,8 +589,8 @@ struct pipe_unix_state {
 */
 static void continue_unix_open_socket(struct composite_context *ctx)
 {
-       struct composite_context *c = talloc_get_type(ctx->async.private_data,
-                                                     struct composite_context);
+       struct composite_context *c = talloc_get_type_abort(
+               ctx->async.private_data, struct composite_context);
 
        c->status = dcerpc_pipe_open_socket_recv(ctx);
        if (NT_STATUS_IS_OK(c->status)) {
@@ -593,8 +656,8 @@ NTSTATUS dcerpc_pipe_open_unix_stream_recv(struct composite_context *c)
 */
 static void continue_np_open_socket(struct composite_context *ctx)
 {
-       struct composite_context *c = talloc_get_type(ctx->async.private_data,
-                                                     struct composite_context);
+       struct composite_context *c = talloc_get_type_abort(
+               ctx->async.private_data, struct composite_context);
 
        c->status = dcerpc_pipe_open_socket_recv(ctx);
        if (!composite_is_ok(c)) return;
@@ -667,13 +730,15 @@ NTSTATUS dcerpc_pipe_open_pipe(struct dcecli_connection *conn, const char *ncalr
 
 const char *dcerpc_unix_socket_path(struct dcecli_connection *p)
 {
-       struct sock_private *sock = (struct sock_private *)p->transport.private_data;
+       struct sock_private *sock = talloc_get_type_abort(
+               p->transport.private_data, struct sock_private);
        return sock->path;
 }
 
 struct socket_address *dcerpc_socket_peer_addr(struct dcecli_connection *p, TALLOC_CTX *mem_ctx)
 {
-       struct sock_private *sock = (struct sock_private *)p->transport.private_data;
-       return socket_get_peer_addr(sock->sock, mem_ctx);
+       struct sock_private *sock = talloc_get_type_abort(
+               p->transport.private_data, struct sock_private);
+       return socket_address_copy(mem_ctx, sock->peer_addr);
 }