s4-winrepl: Migrated the wins replication server to tsocket.
authorAndreas Schneider <asn@redhat.com>
Wed, 20 Jan 2010 13:21:47 +0000 (14:21 +0100)
committerStefan Metzmacher <metze@samba.org>
Wed, 3 Mar 2010 08:22:37 +0000 (09:22 +0100)
Signed-off-by: Stefan Metzmacher <metze@samba.org>
source4/wrepl_server/wrepl_in_call.c
source4/wrepl_server/wrepl_in_connection.c
source4/wrepl_server/wrepl_out_helpers.c
source4/wrepl_server/wrepl_server.h

index fd09bbaf4011359e338d5925beb379b0f586d934..6737d6f5c8784203908ea92802d0f0fb83d369ac 100644 (file)
@@ -31,6 +31,7 @@
 #include "lib/ldb/include/ldb_errors.h"
 #include "system/time.h"
 #include "lib/util/tsort.h"
 #include "lib/ldb/include/ldb_errors.h"
 #include "system/time.h"
 #include "lib/util/tsort.h"
+#include "lib/stream/packet.h" /* FIXME */
 
 static NTSTATUS wreplsrv_in_start_association(struct wreplsrv_in_call *call)
 {
 
 static NTSTATUS wreplsrv_in_start_association(struct wreplsrv_in_call *call)
 {
@@ -341,23 +342,31 @@ static NTSTATUS wreplsrv_in_update(struct wreplsrv_in_call *call)
        struct wreplsrv_out_connection *wrepl_out;
        struct wrepl_table *update_in = &call->req_packet.message.replication.info.table;
        struct wreplsrv_in_update_state *update_state;
        struct wreplsrv_out_connection *wrepl_out;
        struct wrepl_table *update_in = &call->req_packet.message.replication.info.table;
        struct wreplsrv_in_update_state *update_state;
-       uint16_t fde_flags;
+       struct packet_context *packet;
 
        DEBUG(2,("WREPL_REPL_UPDATE: partner[%s] initiator[%s] num_owners[%u]\n",
                call->wreplconn->partner->address,
                update_in->initiator, update_in->partner_count));
 
 
        DEBUG(2,("WREPL_REPL_UPDATE: partner[%s] initiator[%s] num_owners[%u]\n",
                call->wreplconn->partner->address,
                update_in->initiator, update_in->partner_count));
 
-       /* 
-        * we need to flip the connection into a client connection
+       update_state = talloc(wrepl_in, struct wreplsrv_in_update_state);
+       NT_STATUS_HAVE_NO_MEMORY(update_state);
+
+       /*
+        * We need to flip the connection into a client connection
         * and do a WREPL_REPL_SEND_REQUEST's on the that connection
         * and do a WREPL_REPL_SEND_REQUEST's on the that connection
-        * and then stop this connection
+        * and then stop this connection.
         */
         */
-       fde_flags = event_get_fd_flags(wrepl_in->conn->event.fde);
-       talloc_free(wrepl_in->conn->event.fde);
-       wrepl_in->conn->event.fde = NULL;
+       packet = packet_init(wrepl_in);
+       if (packet == NULL) {
+               return NT_STATUS_NO_MEMORY;
+       }
 
 
-       update_state = talloc(wrepl_in, struct wreplsrv_in_update_state);
-       NT_STATUS_HAVE_NO_MEMORY(update_state);
+       /*
+        * TODO We can free the tstream here as we don't use it in the client
+        * yet.
+        */
+       TALLOC_FREE(wrepl_in->send_queue);
+       TALLOC_FREE(wrepl_in->tstream);
 
        wrepl_out = talloc(update_state, struct wreplsrv_out_connection);
        NT_STATUS_HAVE_NO_MEMORY(wrepl_out);
 
        wrepl_out = talloc(update_state, struct wreplsrv_out_connection);
        NT_STATUS_HAVE_NO_MEMORY(wrepl_out);
@@ -368,11 +377,9 @@ static NTSTATUS wreplsrv_in_update(struct wreplsrv_in_call *call)
        wrepl_out->sock                 = wrepl_socket_merge(wrepl_out,
                                                             wrepl_in->conn->event.ctx,
                                                             wrepl_in->conn->socket,
        wrepl_out->sock                 = wrepl_socket_merge(wrepl_out,
                                                             wrepl_in->conn->event.ctx,
                                                             wrepl_in->conn->socket,
-                                                            wrepl_in->packet);
+                                                            packet);
        NT_STATUS_HAVE_NO_MEMORY(wrepl_out->sock);
 
        NT_STATUS_HAVE_NO_MEMORY(wrepl_out->sock);
 
-       event_set_fd_flags(wrepl_out->sock->event.fde, fde_flags);
-
        update_state->wrepl_in                  = wrepl_in;
        update_state->wrepl_out                 = wrepl_out;
        update_state->cycle_io.in.partner       = wrepl_out->partner;
        update_state->wrepl_in                  = wrepl_in;
        update_state->wrepl_out                 = wrepl_out;
        update_state->cycle_io.in.partner       = wrepl_out->partner;
index 7c9c2b77bf4bf616eed52377d397b62c7c649a69..6b60dd178f1e59b2a503f2d23aacbd2e773bb6ce 100644 (file)
@@ -31,6 +31,8 @@
 #include "smbd/process_model.h"
 #include "system/network.h"
 #include "lib/socket/netif.h"
 #include "smbd/process_model.h"
 #include "system/network.h"
 #include "lib/socket/netif.h"
+#include "lib/tsocket/tsocket.h"
+#include "libcli/util/tstream.h"
 #include "param/param.h"
 
 void wreplsrv_terminate_in_connection(struct wreplsrv_in_connection *wreplconn, const char *reason)
 #include "param/param.h"
 
 void wreplsrv_terminate_in_connection(struct wreplsrv_in_connection *wreplconn, const char *reason)
@@ -38,35 +40,19 @@ void wreplsrv_terminate_in_connection(struct wreplsrv_in_connection *wreplconn,
        stream_terminate_connection(wreplconn->conn, reason);
 }
 
        stream_terminate_connection(wreplconn->conn, reason);
 }
 
-static int terminate_after_send_destructor(struct wreplsrv_in_connection **tas)
-{
-       wreplsrv_terminate_in_connection(*tas, "wreplsrv_in_connection: terminate_after_send");
-       return 0;
-}
-
 /*
   receive some data on a WREPL connection
 */
 /*
   receive some data on a WREPL connection
 */
-static NTSTATUS wreplsrv_recv_request(void *private_data, DATA_BLOB blob)
+static NTSTATUS wreplsrv_process(struct wreplsrv_in_connection *wrepl_conn,
+                                struct wreplsrv_in_call **_call)
 {
 {
-       struct wreplsrv_in_connection *wreplconn = talloc_get_type(private_data, struct wreplsrv_in_connection);
-       struct wreplsrv_in_call *call;
-       DATA_BLOB packet_in_blob;
-       DATA_BLOB packet_out_blob;
        struct wrepl_wrap packet_out_wrap;
        NTSTATUS status;
        enum ndr_err_code ndr_err;
        struct wrepl_wrap packet_out_wrap;
        NTSTATUS status;
        enum ndr_err_code ndr_err;
+       struct wreplsrv_in_call *call = *_call;
 
 
-       call = talloc_zero(wreplconn, struct wreplsrv_in_call);
-       NT_STATUS_HAVE_NO_MEMORY(call);
-       call->wreplconn = wreplconn;
-       talloc_steal(call, blob.data);
-
-       packet_in_blob.data = blob.data + 4;
-       packet_in_blob.length = blob.length - 4;
-
-       ndr_err = ndr_pull_struct_blob(&packet_in_blob, call, 
-                                      lp_iconv_convenience(wreplconn->service->task->lp_ctx),
+       ndr_err = ndr_pull_struct_blob(&call->in, call,
+                                      lp_iconv_convenience(wrepl_conn->service->task->lp_ctx),
                                       &call->req_packet,
                                       (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet);
        if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
                                       &call->req_packet,
                                       (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet);
        if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
@@ -74,8 +60,8 @@ static NTSTATUS wreplsrv_recv_request(void *private_data, DATA_BLOB blob)
        }
 
        if (DEBUGLVL(10)) {
        }
 
        if (DEBUGLVL(10)) {
-               DEBUG(10,("Received WINS-Replication packet of length %u\n", 
-                         (unsigned)packet_in_blob.length + 4));
+               DEBUG(10,("Received WINS-Replication packet of length %u\n",
+                         (unsigned int) call->in.length + 4));
                NDR_PRINT_DEBUG(wrepl_packet, &call->req_packet);
        }
 
                NDR_PRINT_DEBUG(wrepl_packet, &call->req_packet);
        }
 
@@ -84,70 +70,31 @@ static NTSTATUS wreplsrv_recv_request(void *private_data, DATA_BLOB blob)
        if (!NT_STATUS_IS_OK(status)) {
                /* w2k just ignores invalid packets, so we do */
                DEBUG(10,("Received WINS-Replication packet was invalid, we just ignore it\n"));
        if (!NT_STATUS_IS_OK(status)) {
                /* w2k just ignores invalid packets, so we do */
                DEBUG(10,("Received WINS-Replication packet was invalid, we just ignore it\n"));
-               talloc_free(call);
+               TALLOC_FREE(call);
+               *_call = NULL;
                return NT_STATUS_OK;
        }
 
        /* and now encode the reply */
        packet_out_wrap.packet = call->rep_packet;
                return NT_STATUS_OK;
        }
 
        /* and now encode the reply */
        packet_out_wrap.packet = call->rep_packet;
-       ndr_err = ndr_push_struct_blob(&packet_out_blob, call, 
-                                      lp_iconv_convenience(wreplconn->service->task->lp_ctx),
+       ndr_err = ndr_push_struct_blob(&call->out, call,
+                                      lp_iconv_convenience(wrepl_conn->service->task->lp_ctx),
                                       &packet_out_wrap,
                                       &packet_out_wrap,
-                                     (ndr_push_flags_fn_t)ndr_push_wrepl_wrap);
+                                      (ndr_push_flags_fn_t) ndr_push_wrepl_wrap);
        if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
                return ndr_map_error2ntstatus(ndr_err);
        }
 
        if (DEBUGLVL(10)) {
        if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
                return ndr_map_error2ntstatus(ndr_err);
        }
 
        if (DEBUGLVL(10)) {
-               DEBUG(10,("Sending WINS-Replication packet of length %d\n", (int)packet_out_blob.length));
+               DEBUG(10,("Sending WINS-Replication packet of length %u\n",
+                        (unsigned int) call->out.length));
                NDR_PRINT_DEBUG(wrepl_packet, &call->rep_packet);
        }
 
                NDR_PRINT_DEBUG(wrepl_packet, &call->rep_packet);
        }
 
-       if (call->terminate_after_send) {
-               struct wreplsrv_in_connection **tas;
-               tas = talloc(packet_out_blob.data, struct wreplsrv_in_connection *);
-               NT_STATUS_HAVE_NO_MEMORY(tas);
-               *tas = wreplconn;
-               talloc_set_destructor(tas, terminate_after_send_destructor);
-       }
-
-       status = packet_send(wreplconn->packet, packet_out_blob);
-       NT_STATUS_NOT_OK_RETURN(status);
-
-       talloc_free(call);
        return NT_STATUS_OK;
 }
 
        return NT_STATUS_OK;
 }
 
-/*
-  called when the socket becomes readable
-*/
-static void wreplsrv_recv(struct stream_connection *conn, uint16_t flags)
-{
-       struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private_data,
-                                                                  struct wreplsrv_in_connection);
-
-       packet_recv(wreplconn->packet);
-}
-
-/*
-  called when the socket becomes writable
-*/
-static void wreplsrv_send(struct stream_connection *conn, uint16_t flags)
-{
-       struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private_data,
-                                                                  struct wreplsrv_in_connection);
-       packet_queue_run(wreplconn->packet);
-}
-
-/*
-  handle socket recv errors
-*/
-static void wreplsrv_recv_error(void *private_data, NTSTATUS status)
-{
-       struct wreplsrv_in_connection *wreplconn = talloc_get_type(private_data,
-                                                                  struct wreplsrv_in_connection);
-       wreplsrv_terminate_in_connection(wreplconn, nt_errstr(status));
-}
+static void wreplsrv_call_loop(struct tevent_req *subreq);
 
 /*
   called when we get a new connection
 
 /*
   called when we get a new connection
@@ -155,43 +102,240 @@ static void wreplsrv_recv_error(void *private_data, NTSTATUS status)
 static void wreplsrv_accept(struct stream_connection *conn)
 {
        struct wreplsrv_service *service = talloc_get_type(conn->private_data, struct wreplsrv_service);
 static void wreplsrv_accept(struct stream_connection *conn)
 {
        struct wreplsrv_service *service = talloc_get_type(conn->private_data, struct wreplsrv_service);
-       struct wreplsrv_in_connection *wreplconn;
+       struct wreplsrv_in_connection *wrepl_conn;
        struct socket_address *peer_ip;
        struct socket_address *peer_ip;
+       struct tevent_req *subreq;
+       int rc, fd;
 
 
-       wreplconn = talloc_zero(conn, struct wreplsrv_in_connection);
-       if (!wreplconn) {
-               stream_terminate_connection(conn, "wreplsrv_accept: out of memory");
+       wrepl_conn = talloc_zero(conn, struct wreplsrv_in_connection);
+       if (wrepl_conn == NULL) {
+               stream_terminate_connection(conn,
+                                           "wreplsrv_accept: out of memory");
                return;
        }
 
                return;
        }
 
-       wreplconn->packet = packet_init(wreplconn);
-       if (!wreplconn->packet) {
-               wreplsrv_terminate_in_connection(wreplconn, "wreplsrv_accept: out of memory");
+       wrepl_conn->send_queue = tevent_queue_create(conn, "wrepl_accept");
+       if (wrepl_conn->send_queue == NULL) {
+               stream_terminate_connection(conn,
+                                           "wrepl_accept: out of memory");
                return;
        }
                return;
        }
-       packet_set_private(wreplconn->packet, wreplconn);
-       packet_set_socket(wreplconn->packet, conn->socket);
-       packet_set_callback(wreplconn->packet, wreplsrv_recv_request);
-       packet_set_full_request(wreplconn->packet, packet_full_request_u32);
-       packet_set_error_handler(wreplconn->packet, wreplsrv_recv_error);
-       packet_set_event_context(wreplconn->packet, conn->event.ctx);
-       packet_set_fde(wreplconn->packet, conn->event.fde);
-       packet_set_serialise(wreplconn->packet);
-
-       wreplconn->conn         = conn;
-       wreplconn->service      = service;
-
-       peer_ip = socket_get_peer_addr(conn->socket, wreplconn);
-       if (!peer_ip) {
-               wreplsrv_terminate_in_connection(wreplconn, "wreplsrv_accept: could not obtain peer IP from kernel");
+
+       TALLOC_FREE(conn->event.fde);
+
+       /*
+        * Clone the fd that the connection isn't closed if we create a client
+        * connection.
+        */
+       fd = dup(socket_get_fd(conn->socket));
+       if (fd == -1) {
+               char *reason;
+
+               reason = talloc_asprintf(conn,
+                                        "wrepl_accept: failed to duplicate the file descriptor - %s",
+                                        strerror(errno));
+               if (reason == NULL) {
+                       reason = strerror(errno);
+               }
+               stream_terminate_connection(conn, reason);
+       }
+       rc = tstream_bsd_existing_socket(wrepl_conn,
+                                        fd,
+                                        &wrepl_conn->tstream);
+       if (rc < 0) {
+               stream_terminate_connection(conn,
+                                           "wrepl_accept: out of memory");
                return;
        }
 
                return;
        }
 
-       wreplconn->partner      = wreplsrv_find_partner(service, peer_ip->addr);
+       wrepl_conn->conn = conn;
+       wrepl_conn->service = service;
 
 
-       conn->private_data = wreplconn;
+       peer_ip = socket_get_peer_addr(conn->socket, wrepl_conn);
+       if (peer_ip == NULL) {
+               wreplsrv_terminate_in_connection(wrepl_conn, "wreplsrv_accept: "
+                               "could not obtain peer IP from kernel");
+               return;
+       }
 
 
+       wrepl_conn->partner = wreplsrv_find_partner(service, peer_ip->addr);
        irpc_add_name(conn->msg_ctx, "wreplsrv_connection");
        irpc_add_name(conn->msg_ctx, "wreplsrv_connection");
+
+       /*
+        * The wrepl pdu's has the length as 4 byte (initial_read_size),
+        * packet_full_request_u32 provides the pdu length then.
+        */
+       subreq = tstream_read_pdu_blob_send(wrepl_conn,
+                                           wrepl_conn->conn->event.ctx,
+                                           wrepl_conn->tstream,
+                                           4, /* initial_read_size */
+                                           packet_full_request_u32,
+                                           wrepl_conn);
+       if (subreq == NULL) {
+               wreplsrv_terminate_in_connection(wrepl_conn, "wrepl_accept: "
+                               "no memory for tstream_read_pdu_blob_send");
+               return;
+       }
+       tevent_req_set_callback(subreq, wreplsrv_call_loop, wrepl_conn);
+}
+
+static void wreplsrv_call_writev_done(struct tevent_req *subreq);
+
+static void wreplsrv_call_loop(struct tevent_req *subreq)
+{
+       struct wreplsrv_in_connection *wrepl_conn = tevent_req_callback_data(subreq,
+                                     struct wreplsrv_in_connection);
+       struct wreplsrv_in_call *call;
+       NTSTATUS status;
+
+       call = talloc_zero(wrepl_conn, struct wreplsrv_in_call);
+       if (call == NULL) {
+               wreplsrv_terminate_in_connection(wrepl_conn, "wreplsrv_call_loop: "
+                               "no memory for wrepl_samba3_call");
+               return;
+       }
+       call->wreplconn = wrepl_conn;
+
+       status = tstream_read_pdu_blob_recv(subreq,
+                                           call,
+                                           &call->in);
+       TALLOC_FREE(subreq);
+       if (!NT_STATUS_IS_OK(status)) {
+               const char *reason;
+
+               reason = talloc_asprintf(call, "wreplsrv_call_loop: "
+                                        "tstream_read_pdu_blob_recv() - %s",
+                                        nt_errstr(status));
+               if (!reason) {
+                       reason = nt_errstr(status);
+               }
+
+               wreplsrv_terminate_in_connection(wrepl_conn, reason);
+               return;
+       }
+
+       DEBUG(10,("Received wrepl packet of length %lu from %s\n",
+                (long) call->in.length,
+                tsocket_address_string(wrepl_conn->conn->remote_address, call)));
+
+       /* skip length header */
+       call->in.data += 4;
+       call->in.length -= 4;
+
+       status = wreplsrv_process(wrepl_conn, &call);
+       if (!NT_STATUS_IS_OK(status)) {
+               const char *reason;
+
+               reason = talloc_asprintf(call, "wreplsrv_call_loop: "
+                                        "tstream_read_pdu_blob_recv() - %s",
+                                        nt_errstr(status));
+               if (reason == NULL) {
+                       reason = nt_errstr(status);
+               }
+
+               wreplsrv_terminate_in_connection(wrepl_conn, reason);
+               return;
+       }
+
+       /* We handed over the connection so we're done here */
+       if (wrepl_conn->tstream == NULL) {
+           return;
+       }
+
+       /* Invalid WINS-Replication packet, we just ignore it */
+       if (call == NULL) {
+               goto noreply;
+       }
+
+       call->out_iov[0].iov_base = call->out.data;
+       call->out_iov[0].iov_len = call->out.length;
+
+       subreq = tstream_writev_queue_send(call,
+                                          wrepl_conn->conn->event.ctx,
+                                          wrepl_conn->tstream,
+                                          wrepl_conn->send_queue,
+                                          call->out_iov, 1);
+       if (subreq == NULL) {
+               wreplsrv_terminate_in_connection(wrepl_conn, "wreplsrv_call_loop: "
+                               "no memory for tstream_writev_queue_send");
+               return;
+       }
+       tevent_req_set_callback(subreq, wreplsrv_call_writev_done, call);
+
+noreply:
+       /*
+        * The wrepl pdu's has the length as 4 byte (initial_read_size),
+        *  provides the pdu length then.
+        */
+       subreq = tstream_read_pdu_blob_send(wrepl_conn,
+                                           wrepl_conn->conn->event.ctx,
+                                           wrepl_conn->tstream,
+                                           4, /* initial_read_size */
+                                           packet_full_request_u32,
+                                           wrepl_conn);
+       if (subreq == NULL) {
+               wreplsrv_terminate_in_connection(wrepl_conn, "wreplsrv_call_loop: "
+                               "no memory for tstream_read_pdu_blob_send");
+               return;
+       }
+       tevent_req_set_callback(subreq, wreplsrv_call_loop, wrepl_conn);
+}
+
+static void wreplsrv_call_writev_done(struct tevent_req *subreq)
+{
+       struct wreplsrv_in_call *call = tevent_req_callback_data(subreq,
+                       struct wreplsrv_in_call);
+       int sys_errno;
+       int rc;
+
+       rc = tstream_writev_queue_recv(subreq, &sys_errno);
+       TALLOC_FREE(subreq);
+       if (rc == -1) {
+               const char *reason;
+
+               reason = talloc_asprintf(call, "wreplsrv_call_writev_done: "
+                                        "tstream_writev_queue_recv() - %d:%s",
+                                        sys_errno, strerror(sys_errno));
+               if (reason == NULL) {
+                       reason = "wreplsrv_call_writev_done: "
+                                "tstream_writev_queue_recv() failed";
+               }
+
+               wreplsrv_terminate_in_connection(call->wreplconn, reason);
+               return;
+       }
+
+       if (call->terminate_after_send) {
+               wreplsrv_terminate_in_connection(call->wreplconn,
+                               "wreplsrv_in_connection: terminate_after_send");
+               return;
+       }
+
+       talloc_free(call);
+}
+
+/*
+  called on a tcp recv
+*/
+static void wreplsrv_recv(struct stream_connection *conn, uint16_t flags)
+{
+       struct wreplsrv_in_connection *wrepl_conn = talloc_get_type(conn->private_data,
+                                                       struct wreplsrv_in_connection);
+       /* this should never be triggered! */
+       DEBUG(0,("Terminating connection - '%s'\n", "wrepl_recv: called"));
+       wreplsrv_terminate_in_connection(wrepl_conn, "wrepl_recv: called");
+}
+
+/*
+  called when we can write to a connection
+*/
+static void wreplsrv_send(struct stream_connection *conn, uint16_t flags)
+{
+       struct wreplsrv_in_connection *wrepl_conn = talloc_get_type(conn->private_data,
+                                                       struct wreplsrv_in_connection);
+       /* this should never be triggered! */
+       DEBUG(0,("Terminating connection - '%s'\n", "wrepl_send: called"));
+       wreplsrv_terminate_in_connection(wrepl_conn, "wrepl_send: called");
 }
 
 static const struct stream_server_ops wreplsrv_stream_ops = {
 }
 
 static const struct stream_server_ops wreplsrv_stream_ops = {
@@ -206,14 +350,15 @@ static const struct stream_server_ops wreplsrv_stream_ops = {
 */
 NTSTATUS wreplsrv_in_connection_merge(struct wreplsrv_partner *partner,
                                      struct socket_context *sock,
 */
 NTSTATUS wreplsrv_in_connection_merge(struct wreplsrv_partner *partner,
                                      struct socket_context *sock,
-                                     struct packet_context *packet,
                                      struct wreplsrv_in_connection **_wrepl_in)
 {
        struct wreplsrv_service *service = partner->service;
        struct wreplsrv_in_connection *wrepl_in;
        const struct model_ops *model_ops;
        struct stream_connection *conn;
                                      struct wreplsrv_in_connection **_wrepl_in)
 {
        struct wreplsrv_service *service = partner->service;
        struct wreplsrv_in_connection *wrepl_in;
        const struct model_ops *model_ops;
        struct stream_connection *conn;
+       struct tevent_req *subreq;
        NTSTATUS status;
        NTSTATUS status;
+       int rc;
 
        /* within the wrepl task we want to be a single process, so
           ask for the single process model ops and pass these to the
 
        /* within the wrepl task we want to be a single process, so
           ask for the single process model ops and pass these to the
@@ -230,9 +375,14 @@ NTSTATUS wreplsrv_in_connection_merge(struct wreplsrv_partner *partner,
        wrepl_in->service       = service;
        wrepl_in->partner       = partner;
 
        wrepl_in->service       = service;
        wrepl_in->partner       = partner;
 
-       status = stream_new_connection_merge(service->task->event_ctx, service->task->lp_ctx, model_ops,
-                                            sock, &wreplsrv_stream_ops, service->task->msg_ctx,
-                                            wrepl_in, &conn);
+       status = stream_new_connection_merge(service->task->event_ctx,
+                                            service->task->lp_ctx,
+                                            model_ops,
+                                            sock,
+                                            &wreplsrv_stream_ops,
+                                            service->task->msg_ctx,
+                                            wrepl_in,
+                                            &conn);
        NT_STATUS_NOT_OK_RETURN(status);
 
        /*
        NT_STATUS_NOT_OK_RETURN(status);
 
        /*
@@ -242,20 +392,43 @@ NTSTATUS wreplsrv_in_connection_merge(struct wreplsrv_partner *partner,
        wrepl_in->conn          = conn;
        talloc_steal(conn, wrepl_in);
 
        wrepl_in->conn          = conn;
        talloc_steal(conn, wrepl_in);
 
+       wrepl_in->send_queue = tevent_queue_create(wrepl_in, "wreplsrv_in_connection_merge");
+       if (wrepl_in->send_queue == NULL) {
+               stream_terminate_connection(conn,
+                                           "wreplsrv_in_connection_merge: out of memory");
+               return NT_STATUS_NO_MEMORY;
+       }
+
+       TALLOC_FREE(conn->event.fde);
+
+       rc = tstream_bsd_existing_socket(wrepl_in,
+                                        socket_get_fd(sock),
+                                        &wrepl_in->tstream);
+       if (rc < 0) {
+               stream_terminate_connection(conn,
+                                           "wreplsrv_in_connection_merge: out of memory");
+               return NT_STATUS_NO_MEMORY;
+       }
+
        /*
        /*
-        * now update the packet handling callback,...
+        * The wrepl pdu's has the length as 4 byte (initial_read_size),
+        * packet_full_request_u32 provides the pdu length then.
         */
         */
-       wrepl_in->packet        = talloc_steal(wrepl_in, packet);
-       packet_set_private(wrepl_in->packet, wrepl_in);
-       packet_set_socket(wrepl_in->packet, conn->socket);
-       packet_set_callback(wrepl_in->packet, wreplsrv_recv_request);
-       packet_set_full_request(wrepl_in->packet, packet_full_request_u32);
-       packet_set_error_handler(wrepl_in->packet, wreplsrv_recv_error);
-       packet_set_event_context(wrepl_in->packet, conn->event.ctx);
-       packet_set_fde(wrepl_in->packet, conn->event.fde);
-       packet_set_serialise(wrepl_in->packet);
+       subreq = tstream_read_pdu_blob_send(wrepl_in,
+                                           wrepl_in->conn->event.ctx,
+                                           wrepl_in->tstream,
+                                           4, /* initial_read_size */
+                                           packet_full_request_u32,
+                                           wrepl_in);
+       if (subreq == NULL) {
+               wreplsrv_terminate_in_connection(wrepl_in, "wreplsrv_in_connection_merge: "
+                               "no memory for tstream_read_pdu_blob_send");
+               return NT_STATUS_NO_MEMORY;
+       }
+       tevent_req_set_callback(subreq, wreplsrv_call_loop, wrepl_in);
 
        *_wrepl_in = wrepl_in;
 
        *_wrepl_in = wrepl_in;
+
        return NT_STATUS_OK;
 }
 
        return NT_STATUS_OK;
 }
 
index 6aff1340720393115c4e2da2a2aa17ffae0df7b3..7f460c56efad7b884425fcf66d6c43caa02500ae 100644 (file)
@@ -855,8 +855,6 @@ static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *s
        struct wreplsrv_in_connection *wrepl_in;
        NTSTATUS status;
        struct socket_context *sock;
        struct wreplsrv_in_connection *wrepl_in;
        NTSTATUS status;
        struct socket_context *sock;
-       struct packet_context *packet;
-       uint16_t fde_flags;
 
        /* prepare the outgoing request */
        req->opcode     = WREPL_OPCODE_BITS;
 
        /* prepare the outgoing request */
        req->opcode     = WREPL_OPCODE_BITS;
@@ -885,21 +883,12 @@ static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *s
        state->wreplconn->sock->sock = NULL;
        talloc_steal(state, sock);
 
        state->wreplconn->sock->sock = NULL;
        talloc_steal(state, sock);
 
-       /* 
-        * steal the packet_context
-        * note the request DATA_BLOB we just send on the
-        * wrepl_socket (client connection) is still unter the 
-        * packet context and will be send to the wire
-        */
-       packet = state->wreplconn->sock->packet;
-       state->wreplconn->sock->packet = NULL;
-       talloc_steal(state, packet);
-
        /*
        /*
-        * get the fde_flags of the old fde event,
-        * so that we can later set the same flags to the new one
+        * TODO: steal the tstream if we switch the client to tsocket.
+        * This is just to get a compiler error as soon as we remove
+        * packet_context.
         */
         */
-       fde_flags = event_get_fd_flags(state->wreplconn->sock->event.fde);
+       state->wreplconn->sock->packet = NULL;
 
        /*
         * free the wrepl_socket (client connection)
 
        /*
         * free the wrepl_socket (client connection)
@@ -915,17 +904,14 @@ static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *s
         *       wreplsrv_in_connection_merge()
         */
        status = wreplsrv_in_connection_merge(state->io->in.partner,
         *       wreplsrv_in_connection_merge()
         */
        status = wreplsrv_in_connection_merge(state->io->in.partner,
-                                             sock, packet, &wrepl_in);
+                                             sock, &wrepl_in);
        NT_STATUS_NOT_OK_RETURN(status);
 
        NT_STATUS_NOT_OK_RETURN(status);
 
-       event_set_fd_flags(wrepl_in->conn->event.fde, fde_flags);
-
        wrepl_in->assoc_ctx.peer_ctx    = state->wreplconn->assoc_ctx.peer_ctx;
        wrepl_in->assoc_ctx.our_ctx     = 0;
 
        /* now we can free the wreplsrv_out_connection */
        wrepl_in->assoc_ctx.peer_ctx    = state->wreplconn->assoc_ctx.peer_ctx;
        wrepl_in->assoc_ctx.our_ctx     = 0;
 
        /* now we can free the wreplsrv_out_connection */
-       talloc_free(state->wreplconn);
-       state->wreplconn = NULL;
+       TALLOC_FREE(state->wreplconn);
 
        state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
 
 
        state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
 
index d92e524c353714a42e0af148c0116d1e9317a7fc..bb6c181040e521695e053632d07d36494f6dcc62 100644 (file)
@@ -19,6 +19,8 @@
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
+#include "system/network.h"
+
 struct wreplsrv_service;
 struct wreplsrv_in_connection;
 struct wreplsrv_out_connection;
 struct wreplsrv_service;
 struct wreplsrv_in_connection;
 struct wreplsrv_out_connection;
@@ -35,6 +37,10 @@ struct wreplsrv_in_call {
        struct wrepl_packet req_packet;
        struct wrepl_packet rep_packet;
        bool terminate_after_send;
        struct wrepl_packet req_packet;
        struct wrepl_packet rep_packet;
        bool terminate_after_send;
+
+       DATA_BLOB in;
+       DATA_BLOB out;
+       struct iovec out_iov[1];
 };
 
 /*
 };
 
 /*
@@ -43,7 +49,8 @@ struct wreplsrv_in_call {
 struct wreplsrv_in_connection {
        struct wreplsrv_in_connection *prev,*next;
        struct stream_connection *conn;
 struct wreplsrv_in_connection {
        struct wreplsrv_in_connection *prev,*next;
        struct stream_connection *conn;
-       struct packet_context *packet;
+       struct tstream_context *tstream;
+       struct tevent_queue *send_queue;
 
        /* our global service context */
        struct wreplsrv_service *service;
 
        /* our global service context */
        struct wreplsrv_service *service;