r26638: libndr: Require explicitly specifying iconv_convenience for ndr_struct_push_b...
[ira/wip.git] / source4 / libcli / wrepl / winsrepl.c
index 61cf633b54566313435110961bb7f5246c99d06b..381df902b40e9a6e1b78ab85bc1ffb560e6fb5c2 100644 (file)
@@ -7,7 +7,7 @@
    
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
+   the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.
    
    This program is distributed in the hope that it will be useful,
    GNU General Public License for more details.
    
    You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 #include "includes.h"
 #include "lib/events/events.h"
-#include "dlinklist.h"
+#include "lib/util/dlinklist.h"
 #include "lib/socket/socket.h"
 #include "libcli/wrepl/winsrepl.h"
+#include "librpc/gen_ndr/ndr_winsrepl.h"
+#include "lib/stream/packet.h"
+#include "libcli/composite/composite.h"
+#include "system/network.h"
+#include "lib/socket/netif.h"
+#include "param/param.h"
+#include "libcli/resolve/resolve.h"
+
+static struct wrepl_request *wrepl_request_finished(struct wrepl_request *req, NTSTATUS status);
 
 /*
   mark all pending requests as dead - called when a socket error happens
 */
 static void wrepl_socket_dead(struct wrepl_socket *wrepl_socket, NTSTATUS status)
 {
-       talloc_set_destructor(wrepl_socket, NULL);
-       wrepl_socket->dead = True;
+       wrepl_socket->dead = true;
+
+       if (wrepl_socket->packet) {
+               packet_recv_disable(wrepl_socket->packet);
+               packet_set_fde(wrepl_socket->packet, NULL);
+               packet_set_socket(wrepl_socket->packet, NULL);
+       }
 
-       if (wrepl_socket->fde) {
-               talloc_free(wrepl_socket->fde);
-               wrepl_socket->fde = NULL;
+       if (wrepl_socket->event.fde) {
+               talloc_free(wrepl_socket->event.fde);
+               wrepl_socket->event.fde = NULL;
        }
 
        if (wrepl_socket->sock) {
@@ -47,23 +60,15 @@ static void wrepl_socket_dead(struct wrepl_socket *wrepl_socket, NTSTATUS status
        if (NT_STATUS_EQUAL(NT_STATUS_UNSUCCESSFUL, status)) {
                status = NT_STATUS_UNEXPECTED_NETWORK_ERROR;
        }
-       while (wrepl_socket->send_queue) {
-               struct wrepl_request *req = wrepl_socket->send_queue;
-               DLIST_REMOVE(wrepl_socket->send_queue, req);
-               req->state = WREPL_REQUEST_ERROR;
-               req->status = status;
-               if (req->async.fn) {
-                       req->async.fn(req);
-               }
-       }
        while (wrepl_socket->recv_queue) {
                struct wrepl_request *req = wrepl_socket->recv_queue;
                DLIST_REMOVE(wrepl_socket->recv_queue, req);
-               req->state = WREPL_REQUEST_ERROR;
-               req->status = status;
-               if (req->async.fn) {
-                       req->async.fn(req);
-               }
+               wrepl_request_finished(req, status);
+       }
+
+       talloc_set_destructor(wrepl_socket, NULL);
+       if (wrepl_socket->free_skipped) {
+               talloc_free(wrepl_socket);
        }
 }
 
@@ -74,180 +79,48 @@ static void wrepl_request_timeout_handler(struct event_context *ev, struct timed
        wrepl_socket_dead(req->wrepl_socket, NT_STATUS_IO_TIMEOUT);
 }
 
-/*
-  handle send events 
-*/
-static void wrepl_handler_send(struct wrepl_socket *wrepl_socket)
-{
-       while (wrepl_socket->send_queue) {
-               struct wrepl_request *req = wrepl_socket->send_queue;
-               size_t nsent;
-               NTSTATUS status;
-
-               status = socket_send(wrepl_socket->sock, &req->buffer, &nsent, 0);
-               if (NT_STATUS_IS_ERR(status)) {
-                       wrepl_socket_dead(wrepl_socket, status);
-                       return;
-               }
-               if (!NT_STATUS_IS_OK(status) || nsent == 0) return;
-
-               req->buffer.data   += nsent;
-               req->buffer.length -= nsent;
-               if (req->buffer.length != 0) {
-                       return;
-               }
-
-               if (req->disconnect_after_send) {
-                       DLIST_REMOVE(wrepl_socket->send_queue, req);
-                       req->status = NT_STATUS_OK;
-                       req->state = WREPL_REQUEST_DONE;
-                       wrepl_socket_dead(wrepl_socket, NT_STATUS_LOCAL_DISCONNECT);
-                       if (req->async.fn) {
-                               req->async.fn(req);
-                       }
-                       return;
-               }
-
-               if (req->send_only) {
-                       DLIST_REMOVE(wrepl_socket->send_queue, req);
-                       req->status = NT_STATUS_OK;
-                       req->state = WREPL_REQUEST_DONE;
-                       if (req->async.fn) {
-                               EVENT_FD_READABLE(wrepl_socket->fde);
-                               req->async.fn(req);
-                               return;
-                       }
-               } else {
-                       DLIST_REMOVE(wrepl_socket->send_queue, req);
-                       DLIST_ADD_END(wrepl_socket->recv_queue, req, struct wrepl_request *);
-                       req->state = WREPL_REQUEST_RECV;
-               }
-
-               EVENT_FD_READABLE(wrepl_socket->fde);
-       }
-
-       EVENT_FD_NOT_WRITEABLE(wrepl_socket->fde);
-}
-
-
 /*
   handle recv events 
 */
-static void wrepl_handler_recv(struct wrepl_socket *wrepl_socket)
+static NTSTATUS wrepl_finish_recv(void *private, DATA_BLOB packet_blob_in)
 {
-       size_t nread;
+       struct wrepl_socket *wrepl_socket = talloc_get_type(private, struct wrepl_socket);
        struct wrepl_request *req = wrepl_socket->recv_queue;
        DATA_BLOB blob;
+       enum ndr_err_code ndr_err;
 
-       if (req == NULL) {
-               NTSTATUS status;
-
-               EVENT_FD_NOT_READABLE(wrepl_socket->fde);
-
-               status = socket_recv(wrepl_socket->sock, NULL, 0, &nread, 0);
-               if (NT_STATUS_EQUAL(NT_STATUS_END_OF_FILE,status)) return;
-               if (NT_STATUS_IS_ERR(status)) {
-                       wrepl_socket_dead(wrepl_socket, status);
-                       return;
-               }
-               return;
+       if (!req) {
+               DEBUG(1,("Received unexpected WINS packet of length %u!\n", 
+                        (unsigned)packet_blob_in.length));
+               return NT_STATUS_INVALID_NETWORK_RESPONSE;
        }
 
-       if (req->buffer.length == 0) {
-               req->buffer = data_blob_talloc(req, NULL, 4);
-               if (req->buffer.data == NULL) {
-                       req->status = NT_STATUS_NO_MEMORY;
-                       goto failed;
-               }
-               req->num_read = 0;
-       }
-
-       /* read in the packet length */
-       if (req->num_read < 4) {
-               uint32_t req_length;
-
-               req->status = socket_recv(wrepl_socket->sock, 
-                                         req->buffer.data + req->num_read,
-                                         4 - req->num_read,
-                                         &nread, 0);
-               if (NT_STATUS_IS_ERR(req->status)) {
-                       wrepl_socket_dead(wrepl_socket, req->status);
-                       return;
-               }
-               if (!NT_STATUS_IS_OK(req->status)) return;
-
-               req->num_read += nread;
-               if (req->num_read != 4) return;
-
-               req_length = RIVAL(req->buffer.data, 0) + 4;
-
-               req->buffer.data = talloc_realloc(req, req->buffer.data, 
-                                                 uint8_t, req_length);
-               if (req->buffer.data == NULL) {
-                       req->status = NT_STATUS_NO_MEMORY;
-                       goto failed;
-               }
-               req->buffer.length = req_length;
-       }
-
-       /* read in the body */
-       req->status = socket_recv(wrepl_socket->sock, 
-                                 req->buffer.data + req->num_read,
-                                 req->buffer.length - req->num_read,
-                                 &nread, 0);
-       if (NT_STATUS_IS_ERR(req->status)) {
-               wrepl_socket_dead(wrepl_socket, req->status);
-               return;
-       }
-       if (!NT_STATUS_IS_OK(req->status)) return;
-
-       req->num_read += nread;
-       if (req->num_read != req->buffer.length) return;
-
        req->packet = talloc(req, struct wrepl_packet);
-       if (req->packet == NULL) {
-               req->status = NT_STATUS_NO_MEMORY;
-               goto failed;
-       }
+       NT_STATUS_HAVE_NO_MEMORY(req->packet);
 
-       blob.data = req->buffer.data + 4;
-       blob.length = req->buffer.length - 4;
+       blob.data = packet_blob_in.data + 4;
+       blob.length = packet_blob_in.length - 4;
        
        /* we have a full request - parse it */
-       req->status = ndr_pull_struct_blob(&blob,
-                                          req->packet, req->packet,
-                                          (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet);
-       if (!NT_STATUS_IS_OK(req->status)) {
-               DEBUG(2,("Failed to parse incoming WINS packet - %s\n",
-                        nt_errstr(req->status)));
-               DEBUG(10,("packet length %d\n", (int)req->buffer.length));
-               NDR_PRINT_DEBUG(wrepl_packet, req->packet);
-               goto failed;
+       ndr_err = ndr_pull_struct_blob(&blob,
+                                      req->packet, req->packet,
+                                      (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               NTSTATUS status = ndr_map_error2ntstatus(ndr_err);
+               wrepl_request_finished(req, status);
+               return NT_STATUS_OK;
        }
 
        if (DEBUGLVL(10)) {
-               DEBUG(10,("Received WINS packet of length %d\n", (int)req->buffer.length));
+               DEBUG(10,("Received WINS packet of length %u\n", 
+                         (unsigned)packet_blob_in.length));
                NDR_PRINT_DEBUG(wrepl_packet, req->packet);
        }
 
-       DLIST_REMOVE(wrepl_socket->recv_queue, req);
-       req->state = WREPL_REQUEST_DONE;
-       if (req->async.fn) {
-               req->async.fn(req);
-       }
-       return;
-
-failed:
-       if (req->state == WREPL_REQUEST_RECV) {
-               DLIST_REMOVE(wrepl_socket->recv_queue, req);
-       }
-       req->state = WREPL_REQUEST_ERROR;
-       if (req->async.fn) {
-               req->async.fn(req);
-       }
+       wrepl_request_finished(req, NT_STATUS_OK);
+       return NT_STATUS_OK;
 }
 
-
 /*
   handler for winrepl events
 */
@@ -256,62 +129,32 @@ static void wrepl_handler(struct event_context *ev, struct fd_event *fde,
 {
        struct wrepl_socket *wrepl_socket = talloc_get_type(private, 
                                                            struct wrepl_socket);
-       if (flags & EVENT_FD_WRITE) {
-               wrepl_handler_send(wrepl_socket);
-       }
        if (flags & EVENT_FD_READ) {
-               wrepl_handler_recv(wrepl_socket);
+               packet_recv(wrepl_socket->packet);
+               return;
+       }
+       if (flags & EVENT_FD_WRITE) {
+               packet_queue_run(wrepl_socket->packet);
        }
 }
 
-
-/*
-  handler for winrepl connection completion
-*/
-static void wrepl_connect_handler(struct event_context *ev, struct fd_event *fde, 
-                                 uint16_t flags, void *private)
+static void wrepl_error(void *private, NTSTATUS status)
 {
        struct wrepl_socket *wrepl_socket = talloc_get_type(private, 
                                                            struct wrepl_socket);
-       struct wrepl_request *req = wrepl_socket->recv_queue;
-
-       talloc_free(wrepl_socket->fde);
-       wrepl_socket->fde = NULL;
-
-       if (req == NULL) return;
-
-       req->status = socket_connect_complete(wrepl_socket->sock, 0);
-       if (NT_STATUS_IS_ERR(req->status)) goto failed;
-
-       if (!NT_STATUS_IS_OK(req->status)) return;
-
-       wrepl_socket->fde = event_add_fd(wrepl_socket->event_ctx, wrepl_socket, 
-                                        socket_get_fd(wrepl_socket->sock), 
-                                        EVENT_FD_WRITE,
-                                        wrepl_handler, wrepl_socket);
-       if (wrepl_socket->fde == NULL) {
-               req->status = NT_STATUS_NO_MEMORY;
-       }
-
-
-failed:
-       DLIST_REMOVE(wrepl_socket->recv_queue, req);
-       if (!NT_STATUS_IS_OK(req->status)) {
-               req->state = WREPL_REQUEST_ERROR;
-       } else {
-               req->state = WREPL_REQUEST_DONE;
-       }
-       if (req->async.fn) {
-               req->async.fn(req);
-       }
+       wrepl_socket_dead(wrepl_socket, status);
 }
 
+
 /*
   destroy a wrepl_socket destructor
 */
-static int wrepl_socket_destructor(void *ptr)
+static int wrepl_socket_destructor(struct wrepl_socket *sock)
 {
-       struct wrepl_socket *sock = talloc_get_type(ptr, struct wrepl_socket);
+       if (sock->dead) {
+               sock->free_skipped = true;
+               return -1;
+       }
        wrepl_socket_dead(sock, NT_STATUS_LOCAL_DISCONNECT);
        return 0;
 }
@@ -326,35 +169,22 @@ struct wrepl_socket *wrepl_socket_init(TALLOC_CTX *mem_ctx,
        struct wrepl_socket *wrepl_socket;
        NTSTATUS status;
 
-       wrepl_socket = talloc(mem_ctx, struct wrepl_socket);
-       if (wrepl_socket == NULL) goto failed;
+       wrepl_socket = talloc_zero(mem_ctx, struct wrepl_socket);
+       if (!wrepl_socket) return NULL;
 
        if (event_ctx == NULL) {
-               wrepl_socket->event_ctx = event_context_init(wrepl_socket);
+               wrepl_socket->event.ctx = event_context_init(wrepl_socket);
        } else {
-               wrepl_socket->event_ctx = talloc_reference(wrepl_socket, event_ctx);
+               wrepl_socket->event.ctx = talloc_reference(wrepl_socket, event_ctx);
        }
-       if (wrepl_socket->event_ctx == NULL) goto failed;
+       if (!wrepl_socket->event.ctx) goto failed;
 
        status = socket_create("ip", SOCKET_TYPE_STREAM, &wrepl_socket->sock, 0);
        if (!NT_STATUS_IS_OK(status)) goto failed;
 
        talloc_steal(wrepl_socket, wrepl_socket->sock);
 
-       wrepl_socket->send_queue        = NULL;
-       wrepl_socket->recv_queue        = NULL;
        wrepl_socket->request_timeout   = WREPL_SOCKET_REQUEST_TIMEOUT;
-       wrepl_socket->dead              = False;
-
-       wrepl_socket->fde = event_add_fd(wrepl_socket->event_ctx, wrepl_socket, 
-                                        socket_get_fd(wrepl_socket->sock), 
-                                        EVENT_FD_WRITE,
-                                        wrepl_connect_handler, wrepl_socket);
-       if (wrepl_socket->fde == NULL) {
-               goto failed;
-       }
-
-       set_blocking(socket_get_fd(wrepl_socket->sock), False);
 
        talloc_set_destructor(wrepl_socket, wrepl_socket_destructor);
 
@@ -370,32 +200,42 @@ failed:
 */
 struct wrepl_socket *wrepl_socket_merge(TALLOC_CTX *mem_ctx, 
                                        struct event_context *event_ctx,
-                                       struct socket_context *socket)
+                                       struct socket_context *sock,
+                                       struct packet_context *pack)
 {
        struct wrepl_socket *wrepl_socket;
 
-       wrepl_socket = talloc(mem_ctx, struct wrepl_socket);
+       wrepl_socket = talloc_zero(mem_ctx, struct wrepl_socket);
        if (wrepl_socket == NULL) goto failed;
 
-       wrepl_socket->event_ctx = talloc_reference(wrepl_socket, event_ctx);
-       if (wrepl_socket->event_ctx == NULL) goto failed;
+       wrepl_socket->event.ctx = talloc_reference(wrepl_socket, event_ctx);
+       if (wrepl_socket->event.ctx == NULL) goto failed;
 
-       wrepl_socket->sock = socket;
+       wrepl_socket->sock = sock;
        talloc_steal(wrepl_socket, wrepl_socket->sock);
 
-       wrepl_socket->send_queue        = NULL;
-       wrepl_socket->recv_queue        = NULL;
+
        wrepl_socket->request_timeout   = WREPL_SOCKET_REQUEST_TIMEOUT;
-       wrepl_socket->dead              = False;
 
-       wrepl_socket->fde = event_add_fd(wrepl_socket->event_ctx, wrepl_socket,
-                                        socket_get_fd(wrepl_socket->sock), 
-                                        0,
-                                        wrepl_handler, wrepl_socket);
-       if (wrepl_socket->fde == NULL) {
+       wrepl_socket->event.fde = event_add_fd(wrepl_socket->event.ctx, wrepl_socket,
+                                              socket_get_fd(wrepl_socket->sock), 
+                                              EVENT_FD_READ,
+                                              wrepl_handler, wrepl_socket);
+       if (wrepl_socket->event.fde == NULL) {
                goto failed;
        }
 
+       wrepl_socket->packet = pack;
+       talloc_steal(wrepl_socket, wrepl_socket->packet);
+       packet_set_private(wrepl_socket->packet, wrepl_socket);
+       packet_set_socket(wrepl_socket->packet, wrepl_socket->sock);
+       packet_set_callback(wrepl_socket->packet, wrepl_finish_recv);
+       packet_set_full_request(wrepl_socket->packet, packet_full_request_u32);
+       packet_set_error_handler(wrepl_socket->packet, wrepl_error);
+       packet_set_event_context(wrepl_socket->packet, wrepl_socket->event.ctx);
+       packet_set_fde(wrepl_socket->packet, wrepl_socket->event.fde);
+       packet_set_serialise(wrepl_socket->packet);
+
        talloc_set_destructor(wrepl_socket, wrepl_socket_destructor);
        
        return wrepl_socket;
@@ -408,12 +248,8 @@ failed:
 /*
   destroy a wrepl_request
 */
-static int wrepl_request_destructor(void *ptr)
+static int wrepl_request_destructor(struct wrepl_request *req)
 {
-       struct wrepl_request *req = talloc_get_type(ptr, struct wrepl_request);
-       if (req->state == WREPL_REQUEST_SEND) {
-               DLIST_REMOVE(req->wrepl_socket->send_queue, req);
-       }
        if (req->state == WREPL_REQUEST_RECV) {
                DLIST_REMOVE(req->wrepl_socket->recv_queue, req);
        }
@@ -428,69 +264,121 @@ static NTSTATUS wrepl_request_wait(struct wrepl_request *req)
 {
        NT_STATUS_HAVE_NO_MEMORY(req);
        while (req->state < WREPL_REQUEST_DONE) {
-               event_loop_once(req->wrepl_socket->event_ctx);
+               event_loop_once(req->wrepl_socket->event.ctx);
        }
        return req->status;
 }
 
-static void wrepl_request_trigger(struct wrepl_request *req);
+struct wrepl_connect_state {
+       struct composite_context *result;
+       struct wrepl_socket *wrepl_socket;
+       struct composite_context *creq;
+};
 
 /*
-  connect a wrepl_socket to a WINS server
+  handler for winrepl connection completion
 */
-struct wrepl_request *wrepl_connect_send(struct wrepl_socket *wrepl_socket,
-                                        const char *our_ip, const char *peer_ip)
+static void wrepl_connect_handler(struct composite_context *creq)
 {
-       struct wrepl_request *req;
-       NTSTATUS status;
+       struct wrepl_connect_state *state = talloc_get_type(creq->async.private_data, 
+                                           struct wrepl_connect_state);
+       struct wrepl_socket *wrepl_socket = state->wrepl_socket;
+       struct composite_context *result = state->result;
+
+       result->status = socket_connect_recv(state->creq);
+       if (!composite_is_ok(result)) return;
+
+       wrepl_socket->event.fde = event_add_fd(wrepl_socket->event.ctx, wrepl_socket, 
+                                              socket_get_fd(wrepl_socket->sock), 
+                                              EVENT_FD_READ,
+                                              wrepl_handler, wrepl_socket);
+       if (composite_nomem(wrepl_socket->event.fde, result)) return;
+
+       /* setup the stream -> packet parser */
+       wrepl_socket->packet = packet_init(wrepl_socket);
+       if (composite_nomem(wrepl_socket->packet, result)) return;
+       packet_set_private(wrepl_socket->packet, wrepl_socket);
+       packet_set_socket(wrepl_socket->packet, wrepl_socket->sock);
+       packet_set_callback(wrepl_socket->packet, wrepl_finish_recv);
+       packet_set_full_request(wrepl_socket->packet, packet_full_request_u32);
+       packet_set_error_handler(wrepl_socket->packet, wrepl_error);
+       packet_set_event_context(wrepl_socket->packet, wrepl_socket->event.ctx);
+       packet_set_fde(wrepl_socket->packet, wrepl_socket->event.fde);
+       packet_set_serialise(wrepl_socket->packet);
+
+       composite_done(result);
+}
 
-       req = talloc_zero(wrepl_socket, struct wrepl_request);
-       if (req == NULL) goto failed;
+/*
+  connect a wrepl_socket to a WINS server
+*/
+struct composite_context *wrepl_connect_send(struct wrepl_socket *wrepl_socket,
+                                            struct resolve_context *resolve_ctx,
+                                            const char *our_ip, const char *peer_ip)
+{
+       struct composite_context *result;
+       struct wrepl_connect_state *state;
+       struct socket_address *peer, *us;
 
-       req->wrepl_socket = wrepl_socket;
-       req->state        = WREPL_REQUEST_RECV;
+       result = talloc_zero(wrepl_socket, struct composite_context);
+       if (!result) return NULL;
 
-       DLIST_ADD(wrepl_socket->recv_queue, req);
+       result->state           = COMPOSITE_STATE_IN_PROGRESS;
+       result->event_ctx       = wrepl_socket->event.ctx;
 
-       talloc_set_destructor(req, wrepl_request_destructor);
+       state = talloc_zero(result, struct wrepl_connect_state);
+       if (composite_nomem(state, result)) return result;
+       result->private_data    = state;
+       state->result           = result;
+       state->wrepl_socket     = wrepl_socket;
 
        if (!our_ip) {
-               our_ip = iface_best_ip(peer_ip);
+               struct interface *ifaces;
+               load_interfaces(state, lp_interfaces(global_loadparm), &ifaces);
+               our_ip = iface_best_ip(ifaces, peer_ip);
        }
 
-       status = socket_connect(wrepl_socket->sock, our_ip, 0, peer_ip, 
-                               WINS_REPLICATION_PORT, 0);
-       if (!NT_STATUS_EQUAL(status, NT_STATUS_MORE_PROCESSING_REQUIRED)) {
-               req->wrepl_socket = wrepl_socket;
-               req->state        = WREPL_REQUEST_ERROR;
-               req->status       = status;
-               wrepl_request_trigger(req);
-               return req;
-       }
+       us = socket_address_from_strings(state, wrepl_socket->sock->backend_name, 
+                                        our_ip, 0);
+       if (composite_nomem(us, result)) return result;
 
-       return req;
+       peer = socket_address_from_strings(state, wrepl_socket->sock->backend_name, 
+                                          peer_ip, WINS_REPLICATION_PORT);
+       if (composite_nomem(peer, result)) return result;
 
-failed:
-       talloc_free(req);
-       return NULL;
+       state->creq = socket_connect_send(wrepl_socket->sock, us, peer,
+                                         0, resolve_ctx,
+                                         wrepl_socket->event.ctx);
+       composite_continue(result, state->creq, wrepl_connect_handler, state);
+       return result;
 }
 
 /*
   connect a wrepl_socket to a WINS server - recv side
 */
-NTSTATUS wrepl_connect_recv(struct wrepl_request *req)
+NTSTATUS wrepl_connect_recv(struct composite_context *result)
 {
-       return wrepl_request_wait(req);
-}
+       struct wrepl_connect_state *state = talloc_get_type(result->private_data,
+                                           struct wrepl_connect_state);
+       struct wrepl_socket *wrepl_socket = state->wrepl_socket;
+       NTSTATUS status = composite_wait(result);
 
+       if (!NT_STATUS_IS_OK(status)) {
+               wrepl_socket_dead(wrepl_socket, status);
+       }
+
+       talloc_free(result);
+       return status;
+}
 
 /*
   connect a wrepl_socket to a WINS server - sync API
 */
-NTSTATUS wrepl_connect(struct wrepl_socket *wrepl_socket, const char *our_ip, const char *peer_ip)
+NTSTATUS wrepl_connect(struct wrepl_socket *wrepl_socket, struct resolve_context *resolve_ctx,
+                      const char *our_ip, const char *peer_ip)
 {
-       struct wrepl_request *req = wrepl_connect_send(wrepl_socket, our_ip, peer_ip);
-       return wrepl_connect_recv(req);
+       struct composite_context *c_req = wrepl_connect_send(wrepl_socket, resolve_ctx, our_ip, peer_ip);
+       return wrepl_connect_recv(c_req);
 }
 
 /* 
@@ -507,66 +395,141 @@ static void wrepl_request_trigger_handler(struct event_context *ev, struct timed
 
 /*
   trigger an immediate event on a wrepl_request
+  the return value should only be used in wrepl_request_send()
+  this is the only place where req->trigger is true
 */
-static void wrepl_request_trigger(struct wrepl_request *req)
+static struct wrepl_request *wrepl_request_finished(struct wrepl_request *req, NTSTATUS status)
 {
-       /* a zero timeout means immediate */
-       event_add_timed(req->wrepl_socket->event_ctx,
-                       req, timeval_zero(),
-                       wrepl_request_trigger_handler, req);
+       struct timed_event *te;
+
+       if (req->state == WREPL_REQUEST_RECV) {
+               DLIST_REMOVE(req->wrepl_socket->recv_queue, req);
+       }
+
+       if (!NT_STATUS_IS_OK(status)) {
+               req->state      = WREPL_REQUEST_ERROR;
+       } else {
+               req->state      = WREPL_REQUEST_DONE;
+       }
+
+       req->status     = status;
+
+       if (req->trigger) {
+               req->trigger = false;
+               /* a zero timeout means immediate */
+               te = event_add_timed(req->wrepl_socket->event.ctx,
+                                    req, timeval_zero(),
+                                    wrepl_request_trigger_handler, req);
+               if (!te) {
+                       talloc_free(req);
+                       return NULL;
+               }
+               return req;
+       }
+
+       if (req->async.fn) {
+               req->async.fn(req);
+       }
+       return NULL;
 }
 
+struct wrepl_send_ctrl_state {
+       struct wrepl_send_ctrl ctrl;
+       struct wrepl_request *req;
+       struct wrepl_socket *wrepl_sock;
+};
+
+static int wrepl_send_ctrl_destructor(struct wrepl_send_ctrl_state *s)
+{
+       struct wrepl_request *req = s->wrepl_sock->recv_queue;
+
+       /* check if the request is still in WREPL_STATE_RECV,
+        * we need this here because the caller has may called 
+        * talloc_free(req) and wrepl_send_ctrl_state isn't
+        * a talloc child of the request, so our s->req pointer
+        * is maybe invalid!
+        */
+       for (; req; req = req->next) {
+               if (req == s->req) break;
+       }
+       if (!req) return 0;
+
+       /* here, we need to make sure the async request handler is called
+        * later in the next event_loop and now now
+        */
+       req->trigger = true;
+       wrepl_request_finished(req, NT_STATUS_OK);
+
+       if (s->ctrl.disconnect_after_send) {
+               wrepl_socket_dead(s->wrepl_sock, NT_STATUS_LOCAL_DISCONNECT);
+       }
+
+       return 0;
+}
 
 /*
   send a generic wins replication request
 */
 struct wrepl_request *wrepl_request_send(struct wrepl_socket *wrepl_socket,
-                                        struct wrepl_packet *packet)
+                                        struct wrepl_packet *packet,
+                                        struct wrepl_send_ctrl *ctrl)
 {
        struct wrepl_request *req;
        struct wrepl_wrap wrap;
+       DATA_BLOB blob;
+       NTSTATUS status;
+       enum ndr_err_code ndr_err;
 
        req = talloc_zero(wrepl_socket, struct wrepl_request);
-       if (req == NULL) goto failed;
+       if (!req) return NULL;
+       req->wrepl_socket = wrepl_socket;
+       req->state        = WREPL_REQUEST_RECV;
+       req->trigger      = true;
+
+       DLIST_ADD_END(wrepl_socket->recv_queue, req, struct wrepl_request *);
+       talloc_set_destructor(req, wrepl_request_destructor);
 
        if (wrepl_socket->dead) {
-               req->wrepl_socket = wrepl_socket;
-               req->state        = WREPL_REQUEST_ERROR;
-               req->status       = NT_STATUS_INVALID_CONNECTION;
-               wrepl_request_trigger(req);
-               return req;
+               return wrepl_request_finished(req, NT_STATUS_INVALID_CONNECTION);
        }
 
-       req->wrepl_socket = wrepl_socket;
-       req->state        = WREPL_REQUEST_SEND;
-
        wrap.packet = *packet;
-       req->status = ndr_push_struct_blob(&req->buffer, req, &wrap,
-                                          (ndr_push_flags_fn_t)ndr_push_wrepl_wrap);
-       if (!NT_STATUS_IS_OK(req->status)) goto failed;
+       ndr_err = ndr_push_struct_blob(&blob, req, lp_iconv_convenience(global_loadparm), &wrap, 
+                                      (ndr_push_flags_fn_t)ndr_push_wrepl_wrap);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               status = ndr_map_error2ntstatus(ndr_err);
+               return wrepl_request_finished(req, status);
+       }
 
        if (DEBUGLVL(10)) {
-               DEBUG(10,("Sending WINS packet of length %d\n", (int)req->buffer.length));
+               DEBUG(10,("Sending WINS packet of length %u\n", 
+                         (unsigned)blob.length));
                NDR_PRINT_DEBUG(wrepl_packet, &wrap.packet);
        }
 
-       DLIST_ADD(wrepl_socket->send_queue, req);
-
-       talloc_set_destructor(req, wrepl_request_destructor);
-
        if (wrepl_socket->request_timeout > 0) {
-               req->te = event_add_timed(wrepl_socket->event_ctx, req, 
+               req->te = event_add_timed(wrepl_socket->event.ctx, req, 
                                          timeval_current_ofs(wrepl_socket->request_timeout, 0), 
                                          wrepl_request_timeout_handler, req);
+               if (!req->te) return wrepl_request_finished(req, NT_STATUS_NO_MEMORY);
        }
 
-       EVENT_FD_WRITEABLE(wrepl_socket->fde);
-       
-       return req;
+       if (ctrl && (ctrl->send_only || ctrl->disconnect_after_send)) {
+               struct wrepl_send_ctrl_state *s = talloc(blob.data, struct wrepl_send_ctrl_state);
+               if (!s) return wrepl_request_finished(req, NT_STATUS_NO_MEMORY);
+               s->ctrl         = *ctrl;
+               s->req          = req;
+               s->wrepl_sock   = wrepl_socket;
+               talloc_set_destructor(s, wrepl_send_ctrl_destructor);
+       }
 
-failed:
-       talloc_free(req);
-       return NULL;
+       status = packet_send(wrepl_socket->packet, blob);
+       if (!NT_STATUS_IS_OK(status)) {
+               return wrepl_request_finished(req, status);
+       }
+
+       req->trigger = false;
+       return req;
 }
 
 /*
@@ -577,7 +540,7 @@ NTSTATUS wrepl_request_recv(struct wrepl_request *req,
                            struct wrepl_packet **packet)
 {
        NTSTATUS status = wrepl_request_wait(req);
-       if (NT_STATUS_IS_OK(status)) {
+       if (NT_STATUS_IS_OK(status) && packet) {
                *packet = talloc_steal(mem_ctx, req->packet);
        }
        talloc_free(req);
@@ -592,7 +555,7 @@ NTSTATUS wrepl_request(struct wrepl_socket *wrepl_socket,
                       struct wrepl_packet *req_packet,
                       struct wrepl_packet **reply_packet)
 {
-       struct wrepl_request *req = wrepl_request_send(wrepl_socket, req_packet);
+       struct wrepl_request *req = wrepl_request_send(wrepl_socket, req_packet, NULL);
        return wrepl_request_recv(req, mem_ctx, reply_packet);
 }
 
@@ -614,7 +577,22 @@ struct wrepl_request *wrepl_associate_send(struct wrepl_socket *wrepl_socket,
        packet->message.start.minor_version = 2;
        packet->message.start.major_version = 5;
 
-       req = wrepl_request_send(wrepl_socket, packet);
+       /*
+        * nt4 uses 41 bytes for the start_association call
+        * so do it the same and as we don't know th emeanings of this bytes
+        * we just send zeros and nt4, w2k and w2k3 seems to be happy with this
+        *
+        * if we don't do this nt4 uses an old version of the wins replication protocol
+        * and that would break nt4 <-> samba replication
+        */
+       packet->padding = data_blob_talloc(packet, NULL, 21);
+       if (packet->padding.data == NULL) {
+               talloc_free(packet);
+               return NULL;
+       }
+       memset(packet->padding.data, 0, packet->padding.length);
+
+       req = wrepl_request_send(wrepl_socket, packet, NULL);
 
        talloc_free(packet);
 
@@ -660,6 +638,7 @@ struct wrepl_request *wrepl_associate_stop_send(struct wrepl_socket *wrepl_socke
 {
        struct wrepl_packet *packet;
        struct wrepl_request *req;
+       struct wrepl_send_ctrl ctrl;
 
        packet = talloc_zero(wrepl_socket, struct wrepl_packet);
        if (packet == NULL) return NULL;
@@ -669,13 +648,14 @@ struct wrepl_request *wrepl_associate_stop_send(struct wrepl_socket *wrepl_socke
        packet->mess_type               = WREPL_STOP_ASSOCIATION;
        packet->message.stop.reason     = io->in.reason;
 
-       req = wrepl_request_send(wrepl_socket, packet);
-
-       if (req && io->in.reason == 0) {
-               req->send_only                  = True;
-               req->disconnect_after_send      = True;
+       ZERO_STRUCT(ctrl);
+       if (io->in.reason == 0) {
+               ctrl.send_only                  = true;
+               ctrl.disconnect_after_send      = true;
        }
 
+       req = wrepl_request_send(wrepl_socket, packet, &ctrl);
+
        talloc_free(packet);
 
        return req;     
@@ -722,7 +702,7 @@ struct wrepl_request *wrepl_pull_table_send(struct wrepl_socket *wrepl_socket,
        packet->mess_type                   = WREPL_REPLICATION;
        packet->message.replication.command = WREPL_REPL_TABLE_QUERY;
 
-       req = wrepl_request_send(wrepl_socket, packet);
+       req = wrepl_request_send(wrepl_socket, packet, NULL);
 
        talloc_free(packet);
 
@@ -794,7 +774,7 @@ struct wrepl_request *wrepl_pull_names_send(struct wrepl_socket *wrepl_socket,
        packet->message.replication.command    = WREPL_REPL_SEND_REQUEST;
        packet->message.replication.info.owner = io->in.partner;
 
-       req = wrepl_request_send(wrepl_socket, packet);
+       req = wrepl_request_send(wrepl_socket, packet, NULL);
 
        talloc_free(packet);