Finish removal of iconv_convenience in public API's.
[bbaumbach/samba-autobuild/.git] / source4 / wrepl_server / wrepl_in_connection.c
index f2cf7c6113a8ad62b227b0d7aa8b6c15eedf8c99..364ebc7fa913c22151e3de62edb48b1b8ac4fd8b 100644 (file)
@@ -31,6 +31,8 @@
 #include "smbd/process_model.h"
 #include "system/network.h"
 #include "lib/socket/netif.h"
+#include "lib/tsocket/tsocket.h"
+#include "libcli/util/tstream.h"
 #include "param/param.h"
 
 void wreplsrv_terminate_in_connection(struct wreplsrv_in_connection *wreplconn, const char *reason)
@@ -38,35 +40,18 @@ void wreplsrv_terminate_in_connection(struct wreplsrv_in_connection *wreplconn,
        stream_terminate_connection(wreplconn->conn, reason);
 }
 
-static int terminate_after_send_destructor(struct wreplsrv_in_connection **tas)
-{
-       wreplsrv_terminate_in_connection(*tas, "wreplsrv_in_connection: terminate_after_send");
-       return 0;
-}
-
 /*
   receive some data on a WREPL connection
 */
-static NTSTATUS wreplsrv_recv_request(void *private, DATA_BLOB blob)
+static NTSTATUS wreplsrv_process(struct wreplsrv_in_connection *wrepl_conn,
+                                struct wreplsrv_in_call **_call)
 {
-       struct wreplsrv_in_connection *wreplconn = talloc_get_type(private, struct wreplsrv_in_connection);
-       struct wreplsrv_in_call *call;
-       DATA_BLOB packet_in_blob;
-       DATA_BLOB packet_out_blob;
        struct wrepl_wrap packet_out_wrap;
        NTSTATUS status;
        enum ndr_err_code ndr_err;
+       struct wreplsrv_in_call *call = *_call;
 
-       call = talloc_zero(wreplconn, struct wreplsrv_in_call);
-       NT_STATUS_HAVE_NO_MEMORY(call);
-       call->wreplconn = wreplconn;
-       talloc_steal(call, blob.data);
-
-       packet_in_blob.data = blob.data + 4;
-       packet_in_blob.length = blob.length - 4;
-
-       ndr_err = ndr_pull_struct_blob(&packet_in_blob, call, 
-                                      lp_iconv_convenience(global_loadparm),
+       ndr_err = ndr_pull_struct_blob(&call->in, call,
                                       &call->req_packet,
                                       (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet);
        if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
@@ -74,8 +59,8 @@ static NTSTATUS wreplsrv_recv_request(void *private, DATA_BLOB blob)
        }
 
        if (DEBUGLVL(10)) {
-               DEBUG(10,("Received WINS-Replication packet of length %u\n", 
-                         (unsigned)packet_in_blob.length + 4));
+               DEBUG(10,("Received WINS-Replication packet of length %u\n",
+                         (unsigned int) call->in.length + 4));
                NDR_PRINT_DEBUG(wrepl_packet, &call->req_packet);
        }
 
@@ -84,114 +69,282 @@ static NTSTATUS wreplsrv_recv_request(void *private, DATA_BLOB blob)
        if (!NT_STATUS_IS_OK(status)) {
                /* w2k just ignores invalid packets, so we do */
                DEBUG(10,("Received WINS-Replication packet was invalid, we just ignore it\n"));
-               talloc_free(call);
+               TALLOC_FREE(call);
+               *_call = NULL;
                return NT_STATUS_OK;
        }
 
        /* and now encode the reply */
        packet_out_wrap.packet = call->rep_packet;
-       ndr_err = ndr_push_struct_blob(&packet_out_blob, call, 
-                                      lp_iconv_convenience(global_loadparm),
+       ndr_err = ndr_push_struct_blob(&call->out, call,
                                       &packet_out_wrap,
-                                     (ndr_push_flags_fn_t)ndr_push_wrepl_wrap);
+                                      (ndr_push_flags_fn_t) ndr_push_wrepl_wrap);
        if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
                return ndr_map_error2ntstatus(ndr_err);
        }
 
        if (DEBUGLVL(10)) {
-               DEBUG(10,("Sending WINS-Replication packet of length %d\n", (int)packet_out_blob.length));
+               DEBUG(10,("Sending WINS-Replication packet of length %u\n",
+                        (unsigned int) call->out.length));
                NDR_PRINT_DEBUG(wrepl_packet, &call->rep_packet);
        }
 
-       if (call->terminate_after_send) {
-               struct wreplsrv_in_connection **tas;
-               tas = talloc(packet_out_blob.data, struct wreplsrv_in_connection *);
-               NT_STATUS_HAVE_NO_MEMORY(tas);
-               *tas = wreplconn;
-               talloc_set_destructor(tas, terminate_after_send_destructor);
-       }
-
-       status = packet_send(wreplconn->packet, packet_out_blob);
-       NT_STATUS_NOT_OK_RETURN(status);
-
-       talloc_free(call);
        return NT_STATUS_OK;
 }
 
+static void wreplsrv_call_loop(struct tevent_req *subreq);
+
 /*
-  called when the socket becomes readable
+  called when we get a new connection
 */
-static void wreplsrv_recv(struct stream_connection *conn, uint16_t flags)
+static void wreplsrv_accept(struct stream_connection *conn)
 {
-       struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private,
-                                                                  struct wreplsrv_in_connection);
+       struct wreplsrv_service *service = talloc_get_type(conn->private_data, struct wreplsrv_service);
+       struct wreplsrv_in_connection *wrepl_conn;
+       struct tsocket_address *peer_addr;
+       char *peer_ip;
+       struct tevent_req *subreq;
+       int rc, fd;
+
+       wrepl_conn = talloc_zero(conn, struct wreplsrv_in_connection);
+       if (wrepl_conn == NULL) {
+               stream_terminate_connection(conn,
+                                           "wreplsrv_accept: out of memory");
+               return;
+       }
 
-       packet_recv(wreplconn->packet);
-}
+       wrepl_conn->send_queue = tevent_queue_create(conn, "wrepl_accept");
+       if (wrepl_conn->send_queue == NULL) {
+               stream_terminate_connection(conn,
+                                           "wrepl_accept: out of memory");
+               return;
+       }
 
-/*
-  called when the socket becomes writable
-*/
-static void wreplsrv_send(struct stream_connection *conn, uint16_t flags)
-{
-       struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private,
-                                                                  struct wreplsrv_in_connection);
-       packet_queue_run(wreplconn->packet);
-}
+       TALLOC_FREE(conn->event.fde);
 
-/*
-  handle socket recv errors
-*/
-static void wreplsrv_recv_error(void *private, NTSTATUS status)
-{
-       struct wreplsrv_in_connection *wreplconn = talloc_get_type(private,
-                                                                  struct wreplsrv_in_connection);
-       wreplsrv_terminate_in_connection(wreplconn, nt_errstr(status));
+       /*
+        * Clone the fd that the connection isn't closed if we create a client
+        * connection.
+        */
+       fd = dup(socket_get_fd(conn->socket));
+       if (fd == -1) {
+               char *reason;
+
+               reason = talloc_asprintf(conn,
+                                        "wrepl_accept: failed to duplicate the file descriptor - %s",
+                                        strerror(errno));
+               if (reason == NULL) {
+                       reason = strerror(errno);
+               }
+               stream_terminate_connection(conn, reason);
+       }
+       rc = tstream_bsd_existing_socket(wrepl_conn,
+                                        fd,
+                                        &wrepl_conn->tstream);
+       if (rc < 0) {
+               stream_terminate_connection(conn,
+                                           "wrepl_accept: out of memory");
+               return;
+       }
+
+       wrepl_conn->conn = conn;
+       wrepl_conn->service = service;
+
+       peer_addr = conn->remote_address;
+
+       if (!tsocket_address_is_inet(peer_addr, "ipv4")) {
+               DEBUG(0,("wreplsrv_accept: non ipv4 peer addr '%s'\n",
+                       tsocket_address_string(peer_addr, wrepl_conn)));
+               wreplsrv_terminate_in_connection(wrepl_conn, "wreplsrv_accept: "
+                               "invalid peer IP");
+               return;
+       }
+
+       peer_ip = tsocket_address_inet_addr_string(peer_addr, wrepl_conn);
+       if (peer_ip == NULL) {
+               wreplsrv_terminate_in_connection(wrepl_conn, "wreplsrv_accept: "
+                               "could not convert peer IP into a string");
+               return;
+       }
+
+       wrepl_conn->partner = wreplsrv_find_partner(service, peer_ip);
+       irpc_add_name(conn->msg_ctx, "wreplsrv_connection");
+
+       /*
+        * The wrepl pdu's has the length as 4 byte (initial_read_size),
+        * packet_full_request_u32 provides the pdu length then.
+        */
+       subreq = tstream_read_pdu_blob_send(wrepl_conn,
+                                           wrepl_conn->conn->event.ctx,
+                                           wrepl_conn->tstream,
+                                           4, /* initial_read_size */
+                                           packet_full_request_u32,
+                                           wrepl_conn);
+       if (subreq == NULL) {
+               wreplsrv_terminate_in_connection(wrepl_conn, "wrepl_accept: "
+                               "no memory for tstream_read_pdu_blob_send");
+               return;
+       }
+       tevent_req_set_callback(subreq, wreplsrv_call_loop, wrepl_conn);
 }
 
-/*
-  called when we get a new connection
-*/
-static void wreplsrv_accept(struct stream_connection *conn)
+static void wreplsrv_call_writev_done(struct tevent_req *subreq);
+
+static void wreplsrv_call_loop(struct tevent_req *subreq)
 {
-       struct wreplsrv_service *service = talloc_get_type(conn->private, struct wreplsrv_service);
-       struct wreplsrv_in_connection *wreplconn;
-       struct socket_address *peer_ip;
+       struct wreplsrv_in_connection *wrepl_conn = tevent_req_callback_data(subreq,
+                                     struct wreplsrv_in_connection);
+       struct wreplsrv_in_call *call;
+       NTSTATUS status;
 
-       wreplconn = talloc_zero(conn, struct wreplsrv_in_connection);
-       if (!wreplconn) {
-               stream_terminate_connection(conn, "wreplsrv_accept: out of memory");
+       call = talloc_zero(wrepl_conn, struct wreplsrv_in_call);
+       if (call == NULL) {
+               wreplsrv_terminate_in_connection(wrepl_conn, "wreplsrv_call_loop: "
+                               "no memory for wrepl_samba3_call");
                return;
        }
+       call->wreplconn = wrepl_conn;
+
+       status = tstream_read_pdu_blob_recv(subreq,
+                                           call,
+                                           &call->in);
+       TALLOC_FREE(subreq);
+       if (!NT_STATUS_IS_OK(status)) {
+               const char *reason;
+
+               reason = talloc_asprintf(call, "wreplsrv_call_loop: "
+                                        "tstream_read_pdu_blob_recv() - %s",
+                                        nt_errstr(status));
+               if (!reason) {
+                       reason = nt_errstr(status);
+               }
 
-       wreplconn->packet = packet_init(wreplconn);
-       if (!wreplconn->packet) {
-               wreplsrv_terminate_in_connection(wreplconn, "wreplsrv_accept: out of memory");
+               wreplsrv_terminate_in_connection(wrepl_conn, reason);
                return;
        }
-       packet_set_private(wreplconn->packet, wreplconn);
-       packet_set_socket(wreplconn->packet, conn->socket);
-       packet_set_callback(wreplconn->packet, wreplsrv_recv_request);
-       packet_set_full_request(wreplconn->packet, packet_full_request_u32);
-       packet_set_error_handler(wreplconn->packet, wreplsrv_recv_error);
-       packet_set_event_context(wreplconn->packet, conn->event.ctx);
-       packet_set_fde(wreplconn->packet, conn->event.fde);
-       packet_set_serialise(wreplconn->packet);
-
-       wreplconn->conn         = conn;
-       wreplconn->service      = service;
-
-       peer_ip = socket_get_peer_addr(conn->socket, wreplconn);
-       if (!peer_ip) {
-               wreplsrv_terminate_in_connection(wreplconn, "wreplsrv_accept: could not obtain peer IP from kernel");
+
+       DEBUG(10,("Received wrepl packet of length %lu from %s\n",
+                (long) call->in.length,
+                tsocket_address_string(wrepl_conn->conn->remote_address, call)));
+
+       /* skip length header */
+       call->in.data += 4;
+       call->in.length -= 4;
+
+       status = wreplsrv_process(wrepl_conn, &call);
+       if (!NT_STATUS_IS_OK(status)) {
+               const char *reason;
+
+               reason = talloc_asprintf(call, "wreplsrv_call_loop: "
+                                        "tstream_read_pdu_blob_recv() - %s",
+                                        nt_errstr(status));
+               if (reason == NULL) {
+                       reason = nt_errstr(status);
+               }
+
+               wreplsrv_terminate_in_connection(wrepl_conn, reason);
                return;
        }
 
-       wreplconn->partner      = wreplsrv_find_partner(service, peer_ip->addr);
+       /* We handed over the connection so we're done here */
+       if (wrepl_conn->tstream == NULL) {
+           return;
+       }
 
-       conn->private = wreplconn;
+       /* Invalid WINS-Replication packet, we just ignore it */
+       if (call == NULL) {
+               goto noreply;
+       }
 
-       irpc_add_name(conn->msg_ctx, "wreplsrv_connection");
+       call->out_iov[0].iov_base = call->out.data;
+       call->out_iov[0].iov_len = call->out.length;
+
+       subreq = tstream_writev_queue_send(call,
+                                          wrepl_conn->conn->event.ctx,
+                                          wrepl_conn->tstream,
+                                          wrepl_conn->send_queue,
+                                          call->out_iov, 1);
+       if (subreq == NULL) {
+               wreplsrv_terminate_in_connection(wrepl_conn, "wreplsrv_call_loop: "
+                               "no memory for tstream_writev_queue_send");
+               return;
+       }
+       tevent_req_set_callback(subreq, wreplsrv_call_writev_done, call);
+
+noreply:
+       /*
+        * The wrepl pdu's has the length as 4 byte (initial_read_size),
+        *  provides the pdu length then.
+        */
+       subreq = tstream_read_pdu_blob_send(wrepl_conn,
+                                           wrepl_conn->conn->event.ctx,
+                                           wrepl_conn->tstream,
+                                           4, /* initial_read_size */
+                                           packet_full_request_u32,
+                                           wrepl_conn);
+       if (subreq == NULL) {
+               wreplsrv_terminate_in_connection(wrepl_conn, "wreplsrv_call_loop: "
+                               "no memory for tstream_read_pdu_blob_send");
+               return;
+       }
+       tevent_req_set_callback(subreq, wreplsrv_call_loop, wrepl_conn);
+}
+
+static void wreplsrv_call_writev_done(struct tevent_req *subreq)
+{
+       struct wreplsrv_in_call *call = tevent_req_callback_data(subreq,
+                       struct wreplsrv_in_call);
+       int sys_errno;
+       int rc;
+
+       rc = tstream_writev_queue_recv(subreq, &sys_errno);
+       TALLOC_FREE(subreq);
+       if (rc == -1) {
+               const char *reason;
+
+               reason = talloc_asprintf(call, "wreplsrv_call_writev_done: "
+                                        "tstream_writev_queue_recv() - %d:%s",
+                                        sys_errno, strerror(sys_errno));
+               if (reason == NULL) {
+                       reason = "wreplsrv_call_writev_done: "
+                                "tstream_writev_queue_recv() failed";
+               }
+
+               wreplsrv_terminate_in_connection(call->wreplconn, reason);
+               return;
+       }
+
+       if (call->terminate_after_send) {
+               wreplsrv_terminate_in_connection(call->wreplconn,
+                               "wreplsrv_in_connection: terminate_after_send");
+               return;
+       }
+
+       talloc_free(call);
+}
+
+/*
+  called on a tcp recv
+*/
+static void wreplsrv_recv(struct stream_connection *conn, uint16_t flags)
+{
+       struct wreplsrv_in_connection *wrepl_conn = talloc_get_type(conn->private_data,
+                                                       struct wreplsrv_in_connection);
+       /* this should never be triggered! */
+       DEBUG(0,("Terminating connection - '%s'\n", "wrepl_recv: called"));
+       wreplsrv_terminate_in_connection(wrepl_conn, "wrepl_recv: called");
+}
+
+/*
+  called when we can write to a connection
+*/
+static void wreplsrv_send(struct stream_connection *conn, uint16_t flags)
+{
+       struct wreplsrv_in_connection *wrepl_conn = talloc_get_type(conn->private_data,
+                                                       struct wreplsrv_in_connection);
+       /* this should never be triggered! */
+       DEBUG(0,("Terminating connection - '%s'\n", "wrepl_send: called"));
+       wreplsrv_terminate_in_connection(wrepl_conn, "wrepl_send: called");
 }
 
 static const struct stream_server_ops wreplsrv_stream_ops = {
@@ -205,20 +358,21 @@ static const struct stream_server_ops wreplsrv_stream_ops = {
   called when we get a new connection
 */
 NTSTATUS wreplsrv_in_connection_merge(struct wreplsrv_partner *partner,
-                                     struct socket_context *sock,
-                                     struct packet_context *packet,
+                                     uint32_t peer_assoc_ctx,
+                                     struct tstream_context **stream,
                                      struct wreplsrv_in_connection **_wrepl_in)
 {
        struct wreplsrv_service *service = partner->service;
        struct wreplsrv_in_connection *wrepl_in;
        const struct model_ops *model_ops;
        struct stream_connection *conn;
+       struct tevent_req *subreq;
        NTSTATUS status;
 
        /* within the wrepl task we want to be a single process, so
           ask for the single process model ops and pass these to the
           stream_setup_socket() call. */
-       model_ops = process_model_byname("single");
+       model_ops = process_model_startup(service->task->event_ctx, "single");
        if (!model_ops) {
                DEBUG(0,("Can't find 'single' process model_ops"));
                return NT_STATUS_INTERNAL_ERROR;
@@ -229,33 +383,51 @@ NTSTATUS wreplsrv_in_connection_merge(struct wreplsrv_partner *partner,
 
        wrepl_in->service       = service;
        wrepl_in->partner       = partner;
-
-       status = stream_new_connection_merge(service->task->event_ctx, model_ops,
-                                            sock, &wreplsrv_stream_ops, service->task->msg_ctx,
-                                            wrepl_in, &conn);
+       wrepl_in->tstream       = talloc_move(wrepl_in, stream);
+       wrepl_in->assoc_ctx.peer_ctx = peer_assoc_ctx;
+
+       status = stream_new_connection_merge(service->task->event_ctx,
+                                            service->task->lp_ctx,
+                                            model_ops,
+                                            &wreplsrv_stream_ops,
+                                            service->task->msg_ctx,
+                                            wrepl_in,
+                                            &conn);
        NT_STATUS_NOT_OK_RETURN(status);
 
        /*
-        * make the wreplsrv_in_connection structure a child of the 
-        * stream_connection, to match the hierachie of wreplsrv_accept
+        * make the wreplsrv_in_connection structure a child of the
+        * stream_connection, to match the hierarchy of wreplsrv_accept
         */
        wrepl_in->conn          = conn;
        talloc_steal(conn, wrepl_in);
 
+       wrepl_in->send_queue = tevent_queue_create(wrepl_in, "wreplsrv_in_connection_merge");
+       if (wrepl_in->send_queue == NULL) {
+               stream_terminate_connection(conn,
+                                           "wreplsrv_in_connection_merge: out of memory");
+               return NT_STATUS_NO_MEMORY;
+       }
+
        /*
-        * now update the packet handling callback,...
+        * The wrepl pdu's has the length as 4 byte (initial_read_size),
+        * packet_full_request_u32 provides the pdu length then.
         */
-       wrepl_in->packet        = talloc_steal(wrepl_in, packet);
-       packet_set_private(wrepl_in->packet, wrepl_in);
-       packet_set_socket(wrepl_in->packet, conn->socket);
-       packet_set_callback(wrepl_in->packet, wreplsrv_recv_request);
-       packet_set_full_request(wrepl_in->packet, packet_full_request_u32);
-       packet_set_error_handler(wrepl_in->packet, wreplsrv_recv_error);
-       packet_set_event_context(wrepl_in->packet, conn->event.ctx);
-       packet_set_fde(wrepl_in->packet, conn->event.fde);
-       packet_set_serialise(wrepl_in->packet);
+       subreq = tstream_read_pdu_blob_send(wrepl_in,
+                                           wrepl_in->conn->event.ctx,
+                                           wrepl_in->tstream,
+                                           4, /* initial_read_size */
+                                           packet_full_request_u32,
+                                           wrepl_in);
+       if (subreq == NULL) {
+               wreplsrv_terminate_in_connection(wrepl_in, "wreplsrv_in_connection_merge: "
+                               "no memory for tstream_read_pdu_blob_send");
+               return NT_STATUS_NO_MEMORY;
+       }
+       tevent_req_set_callback(subreq, wreplsrv_call_loop, wrepl_in);
 
        *_wrepl_in = wrepl_in;
+
        return NT_STATUS_OK;
 }
 
@@ -273,7 +445,7 @@ NTSTATUS wreplsrv_setup_sockets(struct wreplsrv_service *service, struct loadpar
        /* within the wrepl task we want to be a single process, so
           ask for the single process model ops and pass these to the
           stream_setup_socket() call. */
-       model_ops = process_model_byname("single");
+       model_ops = process_model_startup(task->event_ctx, "single");
        if (!model_ops) {
                DEBUG(0,("Can't find 'single' process model_ops"));
                return NT_STATUS_INTERNAL_ERROR;
@@ -294,7 +466,9 @@ NTSTATUS wreplsrv_setup_sockets(struct wreplsrv_service *service, struct loadpar
                */
                for(i = 0; i < num_interfaces; i++) {
                        address = iface_n_ip(ifaces, i);
-                       status = stream_setup_socket(task->event_ctx, model_ops, &wreplsrv_stream_ops,
+                       status = stream_setup_socket(task->event_ctx, 
+                                                    task->lp_ctx, model_ops, 
+                                                    &wreplsrv_stream_ops,
                                                     "ipv4", address, &port, 
                                                      lp_socket_options(task->lp_ctx), 
                                                     service);
@@ -306,7 +480,8 @@ NTSTATUS wreplsrv_setup_sockets(struct wreplsrv_service *service, struct loadpar
                }
        } else {
                address = lp_socket_address(lp_ctx);
-               status = stream_setup_socket(task->event_ctx, model_ops, &wreplsrv_stream_ops,
+               status = stream_setup_socket(task->event_ctx, task->lp_ctx, 
+                                            model_ops, &wreplsrv_stream_ops,
                                             "ipv4", address, &port, lp_socket_options(task->lp_ctx), 
                                             service);
                if (!NT_STATUS_IS_OK(status)) {