r12608: Remove some unused #include lines.
[sfrench/samba-autobuild/.git] / source4 / librpc / rpc / dcerpc_sock.c
index 11d016d8819a86501757fa2b61c3fc658e538ade..2867a8eaccbeebf4cea4c526c2a05728d52baf52 100644 (file)
 */
 
 #include "includes.h"
-#include "librpc/gen_ndr/ndr_epmapper.h"
-
-#define MIN_HDR_SIZE 16
-
-struct sock_blob {
-       struct sock_blob *next, *prev;
-       DATA_BLOB data;
-};
+#include "lib/events/events.h"
+#include "lib/socket/socket.h"
+#include "lib/stream/packet.h"
+#include "libcli/composite/composite.h"
 
 /* transport private information used by general socket pipe transports */
 struct sock_private {
-       struct event_context *event_ctx;
        struct fd_event *fde;
        struct socket_context *sock;
        char *server_name;
-       uint32_t port;
 
-       struct sock_blob *pending_send;
-
-       struct {
-               size_t received;
-               DATA_BLOB data;
-               uint_t pending_count;
-       } recv;
+       struct packet_context *packet;
+       uint32_t pending_reads;
 };
 
 
 /*
   mark the socket dead
 */
-static void sock_dead(struct dcerpc_pipe *p, NTSTATUS status)
+static void sock_dead(struct dcerpc_connection *p, NTSTATUS status)
 {
        struct sock_private *sock = p->transport.private;
 
        if (sock && sock->sock != NULL) {
+               talloc_free(sock->fde);
                talloc_free(sock->sock);
                sock->sock = NULL;
        }
 
-       /* wipe any pending sends */
-       while (sock->pending_send) {
-               struct sock_blob *blob = sock->pending_send;
-               DLIST_REMOVE(sock->pending_send, blob);
-               talloc_free(blob);
-       }
-
        if (!NT_STATUS_IS_OK(status)) {
                p->transport.recv_data(p, NULL, status);
        }
 }
 
+
 /*
-  process send requests
+  handle socket recv errors
 */
-static void sock_process_send(struct dcerpc_pipe *p)
+static void sock_error_handler(void *private, NTSTATUS status)
 {
-       struct sock_private *sock = p->transport.private;
+       struct dcerpc_connection *p = talloc_get_type(private, 
+                                                     struct dcerpc_connection);
+       sock_dead(p, status);
+}
 
-       while (sock->pending_send) {
-               struct sock_blob *blob = sock->pending_send;
-               NTSTATUS status;
-               size_t sent;
-               status = socket_send(sock->sock, &blob->data, &sent, 0);
-               if (NT_STATUS_IS_ERR(status)) {
-                       sock_dead(p, NT_STATUS_NET_WRITE_FAULT);
-                       break;
-               }
-               if (sent == 0) {
-                       break;
-               }
-
-               blob->data.data += sent;
-               blob->data.length -= sent;
-
-               if (blob->data.length != 0) {
-                       break;
-               }
-
-               DLIST_REMOVE(sock->pending_send, blob);
-               talloc_free(blob);
+/*
+  check if a blob is a complete packet
+*/
+static NTSTATUS sock_complete_packet(void *private, DATA_BLOB blob, size_t *size)
+{
+       if (blob.length < DCERPC_FRAG_LEN_OFFSET+2) {
+               return STATUS_MORE_ENTRIES;
        }
-
-       if (sock->pending_send == NULL) {
-               sock->fde->flags &= ~EVENT_FD_WRITE;
+       *size = dcerpc_get_frag_length(&blob);
+       if (*size > blob.length) {
+               return STATUS_MORE_ENTRIES;
        }
+       return NT_STATUS_OK;
 }
 
-
 /*
   process recv requests
 */
-static void sock_process_recv(struct dcerpc_pipe *p)
+static NTSTATUS sock_process_recv(void *private, DATA_BLOB blob)
 {
+       struct dcerpc_connection *p = talloc_get_type(private, 
+                                                     struct dcerpc_connection);
        struct sock_private *sock = p->transport.private;
-       NTSTATUS status;
-       size_t nread;
-
-       if (sock->recv.data.data == NULL) {
-               sock->recv.data = data_blob_talloc(sock, NULL, MIN_HDR_SIZE);
-       }
-
-       /* read in the base header to get the fragment length */
-       if (sock->recv.received < MIN_HDR_SIZE) {
-               uint32_t frag_length;
-
-               status = socket_recv(sock->sock, 
-                                    sock->recv.data.data + sock->recv.received, 
-                                    MIN_HDR_SIZE - sock->recv.received, 
-                                    &nread, 0);
-               if (NT_STATUS_IS_ERR(status)) {
-                       sock_dead(p, NT_STATUS_NET_WRITE_FAULT);
-                       return;
-               }
-               if (nread == 0) {
-                       return;
-               }
-               
-               sock->recv.received += nread;
-
-               if (sock->recv.received != MIN_HDR_SIZE) {
-                       return;
-               }
-               frag_length = dcerpc_get_frag_length(&sock->recv.data);
-
-               sock->recv.data.data = talloc_realloc(sock, sock->recv.data.data,
-                                                    frag_length);
-               if (sock->recv.data.data == NULL) {
-                       sock_dead(p, NT_STATUS_NO_MEMORY);
-                       return;
-               }
-               sock->recv.data.length = frag_length;
-       }
-
-       /* read in the rest of the packet */
-       status = socket_recv(sock->sock, 
-                            sock->recv.data.data + sock->recv.received, 
-                            sock->recv.data.length - sock->recv.received, 
-                            &nread, 0);
-       if (NT_STATUS_IS_ERR(status)) {
-               sock_dead(p, NT_STATUS_NET_WRITE_FAULT);
-               return;
-       }
-       if (nread == 0) {
-               return;
-       }
-       sock->recv.received += nread;
-
-       if (sock->recv.received != sock->recv.data.length) {
-               return;
-       }
-
-       /* we have a full packet */
-       p->transport.recv_data(p, &sock->recv.data, NT_STATUS_OK);
-       talloc_free(sock->recv.data.data);
-       sock->recv.data = data_blob(NULL, 0);
-       sock->recv.received = 0;
-       sock->recv.pending_count--;
-       if (sock->recv.pending_count == 0) {
-               sock->fde->flags &= ~EVENT_FD_READ;
+       sock->pending_reads--;
+       if (sock->pending_reads == 0) {
+               packet_recv_disable(sock->packet);
        }
+       p->transport.recv_data(p, &blob, NT_STATUS_OK);
+       return NT_STATUS_OK;
 }
 
 /*
   called when a IO is triggered by the events system
 */
 static void sock_io_handler(struct event_context *ev, struct fd_event *fde, 
-                          time_t t, uint16_t flags)
+                           uint16_t flags, void *private)
 {
-       struct dcerpc_pipe *p = fde->private;
+       struct dcerpc_connection *p = talloc_get_type(private, 
+                                                     struct dcerpc_connection);
        struct sock_private *sock = p->transport.private;
 
        if (flags & EVENT_FD_WRITE) {
-               sock_process_send(p);
+               packet_queue_run(sock->packet);
+               return;
        }
 
        if (sock->sock == NULL) {
@@ -202,20 +118,19 @@ static void sock_io_handler(struct event_context *ev, struct fd_event *fde,
        }
 
        if (flags & EVENT_FD_READ) {
-               sock_process_recv(p);
+               packet_recv(sock->packet);
        }
 }
 
 /* 
-   initiate a read request 
+   initiate a read request - not needed for dcerpc sockets
 */
-static NTSTATUS sock_send_read(struct dcerpc_pipe *p)
+static NTSTATUS sock_send_read(struct dcerpc_connection *p)
 {
        struct sock_private *sock = p->transport.private;
-
-       sock->recv.pending_count++;
-       if (sock->recv.pending_count == 1) {
-               sock->fde->flags |= EVENT_FD_READ;
+       sock->pending_reads++;
+       if (sock->pending_reads == 1) {
+               packet_recv_enable(sock->packet);
        }
        return NT_STATUS_OK;
 }
@@ -223,25 +138,26 @@ static NTSTATUS sock_send_read(struct dcerpc_pipe *p)
 /* 
    send an initial pdu in a multi-pdu sequence
 */
-static NTSTATUS sock_send_request(struct dcerpc_pipe *p, DATA_BLOB *data, BOOL trigger_read)
+static NTSTATUS sock_send_request(struct dcerpc_connection *p, DATA_BLOB *data, 
+                                 BOOL trigger_read)
 {
        struct sock_private *sock = p->transport.private;
-       struct sock_blob *blob;
+       DATA_BLOB blob;
+       NTSTATUS status;
 
-       blob = talloc_p(sock, struct sock_blob);
-       if (blob == NULL) {
-               return NT_STATUS_NO_MEMORY;
+       if (sock->sock == NULL) {
+               return NT_STATUS_CONNECTION_DISCONNECTED;
        }
 
-       blob->data = data_blob_talloc(blob, data->data, data->length);
-       if (blob->data.data == NULL) {
-               talloc_free(blob);
+       blob = data_blob_talloc(sock->packet, data->data, data->length);
+       if (blob.data == NULL) {
                return NT_STATUS_NO_MEMORY;
        }
 
-       DLIST_ADD_END(sock->pending_send, blob, struct sock_blob *);
-
-       sock->fde->flags |= EVENT_FD_WRITE;
+       status = packet_send(sock->packet, blob);
+       if (!NT_STATUS_IS_OK(status)) {
+               return status;
+       }
 
        if (trigger_read) {
                sock_send_read(p);
@@ -251,21 +167,15 @@ static NTSTATUS sock_send_request(struct dcerpc_pipe *p, DATA_BLOB *data, BOOL t
 }
 
 /* 
-   return the event context so the caller can process asynchronously
+   shutdown sock pipe connection
 */
-static struct event_context *sock_event_context(struct dcerpc_pipe *p)
+static NTSTATUS sock_shutdown_pipe(struct dcerpc_connection *p)
 {
        struct sock_private *sock = p->transport.private;
 
-       return sock->event_ctx;
-}
-
-/* 
-   shutdown sock pipe connection
-*/
-static NTSTATUS sock_shutdown_pipe(struct dcerpc_pipe *p)
-{
-       sock_dead(p, NT_STATUS_OK);
+       if (sock && sock->sock) {
+               sock_dead(p, NT_STATUS_OK);
+       }
 
        return NT_STATUS_OK;
 }
@@ -273,118 +183,205 @@ static NTSTATUS sock_shutdown_pipe(struct dcerpc_pipe *p)
 /*
   return sock server name
 */
-static const char *sock_peer_name(struct dcerpc_pipe *p)
+static const char *sock_peer_name(struct dcerpc_connection *p)
 {
        struct sock_private *sock = p->transport.private;
        return sock->server_name;
 }
 
-/* 
-   open a rpc connection using the generic socket library
-*/
-static NTSTATUS dcerpc_pipe_open_socket(struct dcerpc_pipe **p, 
-                                       const char *server,
-                                       uint32_t port, 
-                                       const char *type,
-                                       enum dcerpc_transport_t transport)
+
+struct pipe_open_socket_state {
+       struct dcerpc_connection *conn;
+       struct socket_context *socket_ctx;
+       struct sock_private *sock;
+       const char *server;
+       uint32_t port;
+       enum dcerpc_transport_t transport;
+};
+
+
+static void continue_socket_connect(struct composite_context *ctx)
 {
+       struct dcerpc_connection *conn;
        struct sock_private *sock;
-       struct socket_context *socket_ctx;
-       struct fd_event fde;
-       NTSTATUS status;
+       struct composite_context *c = talloc_get_type(ctx->async.private_data,
+                                                     struct composite_context);
+       struct pipe_open_socket_state *s = talloc_get_type(c->private_data,
+                                                          struct pipe_open_socket_state);
+
+       /* make it easier to write a function calls */
+       conn = s->conn;
+       sock = s->sock;
+
+       c->status = socket_connect_recv(ctx);
+       if (!NT_STATUS_IS_OK(c->status)) {
+               DEBUG(0, ("Failed to connect host %s on port %d - %s\n", s->server, s->port,
+                         nt_errstr(c->status)));
+               composite_error(c, c->status);
+               return;
+       }
+
+       /*
+         fill in the transport methods
+       */
+       conn->transport.transport     = s->transport;
+       conn->transport.private       = NULL;
+
+       conn->transport.send_request  = sock_send_request;
+       conn->transport.send_read     = sock_send_read;
+       conn->transport.recv_data     = NULL;
+
+       conn->transport.shutdown_pipe = sock_shutdown_pipe;
+       conn->transport.peer_name     = sock_peer_name;
 
-       if (port == 0) {
-               port = EPMAPPER_PORT;
+       sock->sock          = s->socket_ctx;
+       sock->pending_reads = 0;
+       sock->server_name   = strupper_talloc(sock, s->server);
+
+       sock->fde = event_add_fd(conn->event_ctx, sock->sock, socket_get_fd(sock->sock),
+                                0, sock_io_handler, conn);
+       
+       conn->transport.private = sock;
+
+       sock->packet = packet_init(sock);
+       if (sock->packet == NULL) {
+               composite_error(c, NT_STATUS_NO_MEMORY);
+               talloc_free(sock);
+               return;
        }
 
-       if (!(*p = dcerpc_pipe_init())) {
-                return NT_STATUS_NO_MEMORY;
+       packet_set_private(sock->packet, conn);
+       packet_set_socket(sock->packet, sock->sock);
+       packet_set_callback(sock->packet, sock_process_recv);
+       packet_set_full_request(sock->packet, sock_complete_packet);
+       packet_set_error_handler(sock->packet, sock_error_handler);
+       packet_set_event_context(sock->packet, conn->event_ctx);
+       packet_set_fde(sock->packet, sock->fde);
+       packet_set_serialise(sock->packet);
+       packet_recv_disable(sock->packet);
+       packet_set_initial_read(sock->packet, 16);
+
+       /* ensure we don't get SIGPIPE */
+       BlockSignals(True,SIGPIPE);
+
+       composite_done(c);
+}
+
+
+struct composite_context *dcerpc_pipe_open_socket_send(TALLOC_CTX *mem_ctx,
+                                                      struct dcerpc_connection *cn,
+                                                      const char *server,
+                                                      uint32_t port, 
+                                                      const char *type,
+                                                      enum dcerpc_transport_t transport)
+{
+       NTSTATUS status;
+       struct composite_context *c;
+       struct pipe_open_socket_state *s;
+       struct composite_context *conn_req;
+
+       c = talloc_zero(mem_ctx, struct composite_context);
+       if (c == NULL) return NULL;
+
+       s = talloc_zero(c, struct pipe_open_socket_state);
+       if (s == NULL) {
+               composite_error(c, NT_STATUS_NO_MEMORY);
+               goto done;
        }
-       sock = talloc_p((*p), struct sock_private);
-       if (!sock) {
-               talloc_free(*p);
-               return NT_STATUS_NO_MEMORY;
+
+       c->state = COMPOSITE_STATE_IN_PROGRESS;
+       c->private_data = s;
+       c->event_ctx = cn->event_ctx;
+
+       s->conn      = cn;
+       s->transport = transport;
+       s->port      = port;
+       s->server    = talloc_strdup(c, server);
+       if (s->server == NULL) {
+               composite_error(c, NT_STATUS_NO_MEMORY);
+               goto done;
        }
 
-       status = socket_create(type, SOCKET_TYPE_STREAM, &socket_ctx, 0);
-       if (!NT_STATUS_IS_OK(status)) {
-               talloc_free(*p);
-               return status;
+       s->sock = talloc(cn, struct sock_private);
+       if (s->sock == NULL) {
+               composite_error(c, NT_STATUS_NO_MEMORY);
+               goto done;
        }
-       talloc_steal(sock, socket_ctx);
 
-       status = socket_connect(socket_ctx, NULL, 0, server, port, 0);
+       status = socket_create(type, SOCKET_TYPE_STREAM, &s->socket_ctx, 0);
        if (!NT_STATUS_IS_OK(status)) {
-               talloc_free(*p);
-               return status;
+               composite_error(c, status);
+               talloc_free(s->sock);
+               goto done;
        }
+       talloc_steal(s->sock, s->socket_ctx);
 
-       /*
-         fill in the transport methods
-       */
-       (*p)->transport.transport = transport;
-       (*p)->transport.private = NULL;
-
-       (*p)->transport.send_request = sock_send_request;
-       (*p)->transport.send_read = sock_send_read;
-       (*p)->transport.event_context = sock_event_context;
-       (*p)->transport.recv_data = NULL;
-
-       (*p)->transport.shutdown_pipe = sock_shutdown_pipe;
-       (*p)->transport.peer_name = sock_peer_name;
+       conn_req = socket_connect_send(s->socket_ctx, NULL, 0, s->server, s->port, 0, c->event_ctx);
+       if (conn_req == NULL) {
+               composite_error(c, NT_STATUS_NO_MEMORY);
+               goto done;
+       }
        
-       sock->sock = socket_ctx;
-       sock->server_name = talloc_strdup((*p), server);
-       sock->event_ctx = event_context_init(sock);
-       sock->pending_send = NULL;
-       sock->recv.received = 0;
-       sock->recv.data = data_blob(NULL, 0);
-       sock->recv.pending_count = 0;
+       composite_continue(c, conn_req, continue_socket_connect, c);
 
-       fde.fd = socket_get_fd(sock->sock);
-       fde.flags = 0;
-       fde.handler = sock_io_handler;
-       fde.private = *p;
+done:
+       return c;
+}
 
-       sock->fde = event_add_fd(sock->event_ctx, &fde);
 
-       (*p)->transport.private = sock;
+NTSTATUS dcerpc_pipe_open_socket_recv(struct composite_context *c)
+{
+       NTSTATUS status = composite_wait(c);
 
-       /* ensure we don't get SIGPIPE */
-       BlockSignals(True,SIGPIPE);
+       talloc_free(c);
+       return status;
+}
 
-       return NT_STATUS_OK;
+/* 
+   open a rpc connection using the generic socket library
+*/
+NTSTATUS dcerpc_pipe_open_socket(struct dcerpc_connection *conn,
+                                const char *server,
+                                uint32_t port, 
+                                const char *type,
+                                enum dcerpc_transport_t transport)
+{
+       struct composite_context *c;
+       
+       c = dcerpc_pipe_open_socket_send(conn, conn, server, port,
+                                        type, transport);
+       return dcerpc_pipe_open_socket_recv(c);
 }
 
+
 /* 
    open a rpc connection using tcp
 */
-NTSTATUS dcerpc_pipe_open_tcp(struct dcerpc_pipe **p, const char *server, uint32_t port)
+NTSTATUS dcerpc_pipe_open_tcp(struct dcerpc_connection *c, const char *server, uint32_t port)
 {
        NTSTATUS status;
        
        /* Try IPv6 first */
-       status = dcerpc_pipe_open_socket(p, server, port, "ipv6", NCACN_IP_TCP);
+       status = dcerpc_pipe_open_socket(c, server, port, "ipv6", NCACN_IP_TCP);
        if (NT_STATUS_IS_OK(status)) {
                return status;
        }
        
-       return dcerpc_pipe_open_socket(p, server, port, "ipv4", NCACN_IP_TCP);
+       return dcerpc_pipe_open_socket(c, server, port, "ipv4", NCACN_IP_TCP);
 }
 
 /* 
    open a rpc connection to a unix socket 
 */
-NTSTATUS dcerpc_pipe_open_unix_stream(struct dcerpc_pipe **p, const char *path)
+NTSTATUS dcerpc_pipe_open_unix_stream(struct dcerpc_connection *c, const char *path)
 {
-       return dcerpc_pipe_open_socket(p, path, 0, "unix", NCACN_UNIX_STREAM);
+       return dcerpc_pipe_open_socket(c, path, 0, "unix", NCACN_UNIX_STREAM);
 }
 
 /* 
    open a rpc connection to a named pipe 
 */
-NTSTATUS dcerpc_pipe_open_pipe(struct dcerpc_pipe **p, const char *identifier)
+NTSTATUS dcerpc_pipe_open_pipe(struct dcerpc_connection *c, const char *identifier)
 {
        NTSTATUS status;
        char *canon, *full_path;
@@ -394,7 +391,7 @@ NTSTATUS dcerpc_pipe_open_pipe(struct dcerpc_pipe **p, const char *identifier)
        string_replace(canon, '/', '\\');
        full_path = talloc_asprintf(canon, "%s/%s", lp_ncalrpc_dir(), canon);
 
-       status = dcerpc_pipe_open_socket(p, full_path, 0, "unix", NCALRPC);
+       status = dcerpc_pipe_open_socket(c, full_path, 0, "unix", NCALRPC);
        talloc_free(canon);
 
        return status;