Finish removal of iconv_convenience in public API's.
[bbaumbach/samba-autobuild/.git] / source4 / libcli / wrepl / winsrepl.c
index 516c72ef29e9672fd260d411d7ba5389ae9f4dac..75cb34f6327dfa6387d6628bb8e13d7ccee81c57 100644 (file)
@@ -4,10 +4,11 @@
    low level WINS replication client code
 
    Copyright (C) Andrew Tridgell 2005
+   Copyright (C) Stefan Metzmacher 2005-2010
    
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
+   the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.
    
    This program is distributed in the hope that it will be useful,
    GNU General Public License for more details.
    
    You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 #include "includes.h"
 #include "lib/events/events.h"
-#include "dlinklist.h"
-#include "lib/socket/socket.h"
+#include "../lib/util/dlinklist.h"
 #include "libcli/wrepl/winsrepl.h"
+#include "librpc/gen_ndr/ndr_winsrepl.h"
+#include "lib/stream/packet.h"
+#include "system/network.h"
+#include "lib/socket/netif.h"
+#include "param/param.h"
+#include "lib/util/tevent_ntstatus.h"
+#include "lib/tsocket/tsocket.h"
+#include "libcli/util/tstream.h"
 
 /*
-  mark all pending requests as dead - called when a socket error happens
+  main context structure for the wins replication client library
 */
-static void wrepl_socket_dead(struct wrepl_socket *wrepl_socket)
-{
-       event_set_fd_flags(wrepl_socket->fde, 0);
+struct wrepl_socket {
+       struct {
+               struct tevent_context *ctx;
+       } event;
 
-       while (wrepl_socket->send_queue) {
-               struct wrepl_request *req = wrepl_socket->send_queue;
-               DLIST_REMOVE(wrepl_socket->send_queue, req);
-               req->state = WREPL_REQUEST_ERROR;
-               req->status = NT_STATUS_UNEXPECTED_NETWORK_ERROR;
-               if (req->async.fn) {
-                       req->async.fn(req);
-               }
+       /* the default timeout for requests, 0 means no timeout */
+#define WREPL_SOCKET_REQUEST_TIMEOUT   (60)
+       uint32_t request_timeout;
+
+       struct tevent_queue *request_queue;
+
+       struct tstream_context *stream;
+};
+
+bool wrepl_socket_is_connected(struct wrepl_socket *wrepl_sock)
+{
+       if (!wrepl_sock) {
+               return false;
        }
-       while (wrepl_socket->recv_queue) {
-               struct wrepl_request *req = wrepl_socket->recv_queue;
-               DLIST_REMOVE(wrepl_socket->recv_queue, req);
-               req->state = WREPL_REQUEST_ERROR;
-               req->status = NT_STATUS_UNEXPECTED_NETWORK_ERROR;
-               if (req->async.fn) {
-                       req->async.fn(req);
-               }
+
+       if (!wrepl_sock->stream) {
+               return false;
        }
+
+       return true;
 }
 
 /*
-  handle send events 
+  initialise a wrepl_socket. The event_ctx is optional, if provided then
+  operations will use that event context
 */
-static void wrepl_handler_send(struct wrepl_socket *wrepl_socket)
+struct wrepl_socket *wrepl_socket_init(TALLOC_CTX *mem_ctx,
+                                      struct tevent_context *event_ctx)
 {
-       while (wrepl_socket->send_queue) {
-               struct wrepl_request *req = wrepl_socket->send_queue;
-               size_t nsent;
-               NTSTATUS status;
-
-               status = socket_send(wrepl_socket->sock, &req->buffer, &nsent, 0);
-               if (NT_STATUS_IS_ERR(status)) {
-                       wrepl_socket_dead(wrepl_socket);
-                       return;
-               }
-               if (!NT_STATUS_IS_OK(status) || nsent == 0) return;
+       struct wrepl_socket *wrepl_socket;
 
-               req->buffer.data   += nsent;
-               req->buffer.length -= nsent;
-               if (req->buffer.length != 0) {
-                       return;
-               }
+       wrepl_socket = talloc_zero(mem_ctx, struct wrepl_socket);
+       if (!wrepl_socket) {
+               return NULL;
+       }
 
-               DLIST_REMOVE(wrepl_socket->send_queue, req);
-               DLIST_ADD_END(wrepl_socket->recv_queue, req, struct wrepl_request *);
-               req->state = WREPL_REQUEST_RECV;
+       wrepl_socket->event.ctx = event_ctx;
+       if (!wrepl_socket->event.ctx) {
+               goto failed;
+       }
 
-               EVENT_FD_READABLE(wrepl_socket->fde);
+       wrepl_socket->request_queue = tevent_queue_create(wrepl_socket,
+                                                         "wrepl request queue");
+       if (wrepl_socket->request_queue == NULL) {
+               goto failed;
        }
 
-       EVENT_FD_NOT_WRITEABLE(wrepl_socket->fde);
+       wrepl_socket->request_timeout   = WREPL_SOCKET_REQUEST_TIMEOUT;
+
+       return wrepl_socket;
+
+failed:
+       talloc_free(wrepl_socket);
+       return NULL;
 }
 
+/*
+  initialise a wrepl_socket from an already existing connection
+*/
+NTSTATUS wrepl_socket_donate_stream(struct wrepl_socket *wrepl_socket,
+                                   struct tstream_context **stream)
+{
+       if (wrepl_socket->stream) {
+               return NT_STATUS_CONNECTION_ACTIVE;
+       }
+
+       wrepl_socket->stream = talloc_move(wrepl_socket, stream);
+       return NT_STATUS_OK;
+}
 
 /*
-  handle recv events 
+  initialise a wrepl_socket from an already existing connection
 */
-static void wrepl_handler_recv(struct wrepl_socket *wrepl_socket)
+NTSTATUS wrepl_socket_split_stream(struct wrepl_socket *wrepl_socket,
+                                  TALLOC_CTX *mem_ctx,
+                                  struct tstream_context **stream)
 {
-       size_t nread;
-       struct wrepl_request *req = wrepl_socket->recv_queue;
-       DATA_BLOB blob;
+       size_t num_requests;
 
-       if (req == NULL) {
-               EVENT_FD_NOT_READABLE(wrepl_socket->fde);
-               return;
+       if (!wrepl_socket->stream) {
+               return NT_STATUS_CONNECTION_INVALID;
        }
 
-       if (req->buffer.length == 0) {
-               req->buffer = data_blob_talloc(req, NULL, 4);
-               if (req->buffer.data == NULL) {
-                       req->status = NT_STATUS_NO_MEMORY;
-                       goto failed;
-               }
-               req->num_read = 0;
+       num_requests = tevent_queue_length(wrepl_socket->request_queue);
+       if (num_requests > 0) {
+               return NT_STATUS_CONNECTION_IN_USE;
        }
 
-       /* read in the packet length */
-       if (req->num_read < 4) {
-               uint32_t req_length;
-
-               req->status = socket_recv(wrepl_socket->sock, 
-                                         req->buffer.data + req->num_read,
-                                         4 - req->num_read,
-                                         &nread, 0);
-               if (NT_STATUS_IS_ERR(req->status)) goto failed;
-               if (!NT_STATUS_IS_OK(req->status)) return;
+       *stream = talloc_move(wrepl_socket, &wrepl_socket->stream);
+       return NT_STATUS_OK;
+}
 
-               req->num_read += nread;
-               if (req->num_read != 4) return;
+const char *wrepl_best_ip(struct loadparm_context *lp_ctx, const char *peer_ip)
+{
+       struct interface *ifaces;
+       load_interfaces(lp_ctx, lp_interfaces(lp_ctx), &ifaces);
+       return iface_best_ip(ifaces, peer_ip);
+}
 
-               req_length = RIVAL(req->buffer.data, 0) + 4;
+struct wrepl_connect_state {
+       struct {
+               struct wrepl_socket *wrepl_socket;
+               struct tevent_context *ev;
+       } caller;
+       struct tsocket_address *local_address;
+       struct tsocket_address *remote_address;
+       struct tstream_context *stream;
+};
+
+static void wrepl_connect_trigger(struct tevent_req *req,
+                                 void *private_date);
+
+struct tevent_req *wrepl_connect_send(TALLOC_CTX *mem_ctx,
+                                     struct tevent_context *ev,
+                                     struct wrepl_socket *wrepl_socket,
+                                     const char *our_ip, const char *peer_ip)
+{
+       struct tevent_req *req;
+       struct wrepl_connect_state *state;
+       int ret;
+       bool ok;
 
-               req->buffer.data = talloc_realloc(req, req->buffer.data, 
-                                                 uint8_t, req_length);
-               if (req->buffer.data == NULL) {
-                       req->status = NT_STATUS_NO_MEMORY;
-                       goto failed;
-               }
-               req->buffer.length = req_length;
+       req = tevent_req_create(mem_ctx, &state,
+                               struct wrepl_connect_state);
+       if (req == NULL) {
+               return NULL;
        }
 
-       /* read in the body */
-       req->status = socket_recv(wrepl_socket->sock, 
-                                 req->buffer.data + req->num_read,
-                                 req->buffer.length - req->num_read,
-                                 &nread, 0);
-       if (NT_STATUS_IS_ERR(req->status)) goto failed;
-       if (!NT_STATUS_IS_OK(req->status)) return;
-
-       req->num_read += nread;
-       if (req->num_read != req->buffer.length) return;
+       state->caller.wrepl_socket = wrepl_socket;
+       state->caller.ev = ev;
 
-       req->packet = talloc(req, struct wrepl_packet);
-       if (req->packet == NULL) {
-               req->status = NT_STATUS_NO_MEMORY;
-               goto failed;
+       if (wrepl_socket->stream) {
+               tevent_req_nterror(req, NT_STATUS_CONNECTION_ACTIVE);
+               return tevent_req_post(req, ev);
        }
 
-       blob.data = req->buffer.data + 4;
-       blob.length = req->buffer.length - 4;
-       
-       /* we have a full request - parse it */
-       req->status = ndr_pull_struct_blob(&blob,
-                                          req->packet, req->packet,
-                                          (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet);
-       if (!NT_STATUS_IS_OK(req->status)) {
-               DEBUG(2,("Failed to parse incoming WINS packet - %s\n",
-                        nt_errstr(req->status)));
-               DEBUG(10,("packet length %d\n", (int)req->buffer.length));
-               NDR_PRINT_DEBUG(wrepl_packet, req->packet);
-               goto failed;
+       ret = tsocket_address_inet_from_strings(state, "ipv4",
+                                               our_ip, 0,
+                                               &state->local_address);
+       if (ret != 0) {
+               NTSTATUS status = map_nt_error_from_unix(errno);
+               tevent_req_nterror(req, status);
+               return tevent_req_post(req, ev);
        }
 
-       if (DEBUGLVL(10)) {
-               DEBUG(10,("Received WINS packet of length %d\n", (int)req->buffer.length));
-               NDR_PRINT_DEBUG(wrepl_packet, req->packet);
+       ret = tsocket_address_inet_from_strings(state, "ipv4",
+                                               peer_ip, WINS_REPLICATION_PORT,
+                                               &state->remote_address);
+       if (ret != 0) {
+               NTSTATUS status = map_nt_error_from_unix(errno);
+               tevent_req_nterror(req, status);
+               return tevent_req_post(req, ev);
        }
 
-       DLIST_REMOVE(wrepl_socket->recv_queue, req);
-       req->state = WREPL_REQUEST_DONE;
-       if (req->async.fn) {
-               req->async.fn(req);
+       ok = tevent_queue_add(wrepl_socket->request_queue,
+                             ev,
+                             req,
+                             wrepl_connect_trigger,
+                             NULL);
+       if (!ok) {
+               tevent_req_nomem(NULL, req);
+               return tevent_req_post(req, ev);
        }
-       return;
 
-failed:
-       if (req->state == WREPL_REQUEST_RECV) {
-               DLIST_REMOVE(wrepl_socket->recv_queue, req);
-       }
-       req->state = WREPL_REQUEST_ERROR;
-       if (req->async.fn) {
-               req->async.fn(req);
+       if (wrepl_socket->request_timeout > 0) {
+               struct timeval endtime;
+               endtime = tevent_timeval_current_ofs(wrepl_socket->request_timeout, 0);
+               ok = tevent_req_set_endtime(req, ev, endtime);
+               if (!ok) {
+                       return tevent_req_post(req, ev);
+               }
        }
+
+       return req;
 }
 
+static void wrepl_connect_done(struct tevent_req *subreq);
 
-/*
-  handler for winrepl events
-*/
-static void wrepl_handler(struct event_context *ev, struct fd_event *fde, 
-                         uint16_t flags, void *private)
+static void wrepl_connect_trigger(struct tevent_req *req,
+                                 void *private_date)
 {
-       struct wrepl_socket *wrepl_socket = talloc_get_type(private, 
-                                                           struct wrepl_socket);
-       if (flags & EVENT_FD_WRITE) {
-               wrepl_handler_send(wrepl_socket);
-       }
-       if (flags & EVENT_FD_READ) {
-               wrepl_handler_recv(wrepl_socket);
+       struct wrepl_connect_state *state = tevent_req_data(req,
+                                           struct wrepl_connect_state);
+       struct tevent_req *subreq;
+
+       subreq = tstream_inet_tcp_connect_send(state,
+                                              state->caller.ev,
+                                              state->local_address,
+                                              state->remote_address);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
        }
+       tevent_req_set_callback(subreq, wrepl_connect_done, req);
+
+       return;
 }
 
+static void wrepl_connect_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_connect_state *state = tevent_req_data(req,
+                                           struct wrepl_connect_state);
+       int ret;
+       int sys_errno;
+
+       ret = tstream_inet_tcp_connect_recv(subreq, &sys_errno,
+                                           state, &state->stream);
+       if (ret != 0) {
+               NTSTATUS status = map_nt_error_from_unix(sys_errno);
+               tevent_req_nterror(req, status);
+               return;
+       }
+
+       tevent_req_done(req);
+}
 
 /*
-  handler for winrepl connection completion
+  connect a wrepl_socket to a WINS server - recv side
 */
-static void wrepl_connect_handler(struct event_context *ev, struct fd_event *fde, 
-                                 uint16_t flags, void *private)
+NTSTATUS wrepl_connect_recv(struct tevent_req *req)
 {
-       struct wrepl_socket *wrepl_socket = talloc_get_type(private, 
-                                                           struct wrepl_socket);
-       struct wrepl_request *req = wrepl_socket->recv_queue;
+       struct wrepl_connect_state *state = tevent_req_data(req,
+                                           struct wrepl_connect_state);
+       struct wrepl_socket *wrepl_socket = state->caller.wrepl_socket;
+       NTSTATUS status;
 
-       talloc_free(fde);
+       if (tevent_req_is_nterror(req, &status)) {
+               tevent_req_received(req);
+               return status;
+       }
 
-       if (req == NULL) return;
+       wrepl_socket->stream = talloc_move(wrepl_socket, &state->stream);
 
-       req->status = socket_connect_complete(wrepl_socket->sock, 0);
-       if (NT_STATUS_IS_ERR(req->status)) goto failed;
+       tevent_req_received(req);
+       return NT_STATUS_OK;
+}
+
+/*
+  connect a wrepl_socket to a WINS server - sync API
+*/
+NTSTATUS wrepl_connect(struct wrepl_socket *wrepl_socket,
+                      const char *our_ip, const char *peer_ip)
+{
+       struct tevent_req *subreq;
+       bool ok;
+       NTSTATUS status;
 
-       if (!NT_STATUS_IS_OK(req->status)) return;
+       subreq = wrepl_connect_send(wrepl_socket, wrepl_socket->event.ctx,
+                                   wrepl_socket, our_ip, peer_ip);
+       NT_STATUS_HAVE_NO_MEMORY(subreq);
 
-       wrepl_socket->fde = event_add_fd(wrepl_socket->event_ctx, wrepl_socket, 
-                                        socket_get_fd(wrepl_socket->sock), 
-                                        0,
-                                        wrepl_handler, wrepl_socket);
-       if (wrepl_socket->fde == NULL) {
-               req->status = NT_STATUS_NO_MEMORY;
+       ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+       if (!ok) {
+               TALLOC_FREE(subreq);
+               return NT_STATUS_INTERNAL_ERROR;
        }
 
+       status = wrepl_connect_recv(subreq);
+       TALLOC_FREE(subreq);
+       NT_STATUS_NOT_OK_RETURN(status);
 
-failed:
-       DLIST_REMOVE(wrepl_socket->recv_queue, req);
-       if (!NT_STATUS_IS_OK(req->status)) {
-               req->state = WREPL_REQUEST_ERROR;
-       } else {
-               req->state = WREPL_REQUEST_DONE;
-       }
-       if (req->async.fn) {
-               req->async.fn(req);
-       }
+       return NT_STATUS_OK;
 }
 
-
-/*
-  initialise a wrepl_socket. The event_ctx is optional, if provided then
-  operations will use that event context
-*/
-struct wrepl_socket *wrepl_socket_init(TALLOC_CTX *mem_ctx, 
-                                      struct event_context *event_ctx)
+struct wrepl_request_state {
+       struct {
+               struct wrepl_socket *wrepl_socket;
+               struct tevent_context *ev;
+       } caller;
+       struct wrepl_send_ctrl ctrl;
+       struct {
+               struct wrepl_wrap wrap;
+               DATA_BLOB blob;
+               struct iovec iov;
+       } req;
+       bool one_way;
+       struct {
+               DATA_BLOB blob;
+               struct wrepl_packet *packet;
+       } rep;
+};
+
+static void wrepl_request_trigger(struct tevent_req *req,
+                                 void *private_data);
+
+struct tevent_req *wrepl_request_send(TALLOC_CTX *mem_ctx,
+                                     struct tevent_context *ev,
+                                     struct wrepl_socket *wrepl_socket,
+                                     const struct wrepl_packet *packet,
+                                     const struct wrepl_send_ctrl *ctrl)
 {
-       struct wrepl_socket *wrepl_socket;
+       struct tevent_req *req;
+       struct wrepl_request_state *state;
        NTSTATUS status;
+       enum ndr_err_code ndr_err;
+       bool ok;
 
-       wrepl_socket = talloc(mem_ctx, struct wrepl_socket);
-       if (wrepl_socket == NULL) goto failed;
+       if (wrepl_socket->event.ctx != ev) {
+               /* TODO: remove wrepl_socket->event.ctx !!! */
+               smb_panic("wrepl_associate_stop_send event context mismatch!");
+               return NULL;
+       }
 
-       if (event_ctx == NULL) {
-               wrepl_socket->event_ctx = event_context_init(wrepl_socket);
-       } else {
-               wrepl_socket->event_ctx = talloc_reference(wrepl_socket, event_ctx);
+       req = tevent_req_create(mem_ctx, &state,
+                               struct wrepl_request_state);
+       if (req == NULL) {
+               return NULL;
        }
-       if (wrepl_socket->event_ctx == NULL) goto failed;
 
-       status = socket_create("ip", SOCKET_TYPE_STREAM, &wrepl_socket->sock, 0);
-       if (!NT_STATUS_IS_OK(status)) goto failed;
+       state->caller.wrepl_socket = wrepl_socket;
+       state->caller.ev = ev;
 
-       talloc_steal(wrepl_socket, wrepl_socket->sock);
+       if (ctrl) {
+               state->ctrl = *ctrl;
+       }
 
-       wrepl_socket->send_queue = NULL;
-       wrepl_socket->recv_queue = NULL;
+       if (wrepl_socket->stream == NULL) {
+               tevent_req_nterror(req, NT_STATUS_INVALID_CONNECTION);
+               return tevent_req_post(req, ev);
+       }
 
-       wrepl_socket->fde = event_add_fd(wrepl_socket->event_ctx, wrepl_socket, 
-                                        socket_get_fd(wrepl_socket->sock), 
-                                        EVENT_FD_WRITE,
-                                        wrepl_connect_handler, wrepl_socket);
+       state->req.wrap.packet = *packet;
+       ndr_err = ndr_push_struct_blob(&state->req.blob, state,
+                                      &state->req.wrap,
+                                      (ndr_push_flags_fn_t)ndr_push_wrepl_wrap);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               status = ndr_map_error2ntstatus(ndr_err);
+               tevent_req_nterror(req, status);
+               return tevent_req_post(req, ev);
+       }
 
-       set_blocking(socket_get_fd(wrepl_socket->sock), False);
-       
-       return wrepl_socket;
+       state->req.iov.iov_base = state->req.blob.data;
+       state->req.iov.iov_len = state->req.blob.length;
+
+       ok = tevent_queue_add(wrepl_socket->request_queue,
+                             ev,
+                             req,
+                             wrepl_request_trigger,
+                             NULL);
+       if (!ok) {
+               tevent_req_nomem(NULL, req);
+               return tevent_req_post(req, ev);
+       }
 
-failed:
-       talloc_free(wrepl_socket);
-       return NULL;
+       if (wrepl_socket->request_timeout > 0) {
+               struct timeval endtime;
+               endtime = tevent_timeval_current_ofs(wrepl_socket->request_timeout, 0);
+               ok = tevent_req_set_endtime(req, ev, endtime);
+               if (!ok) {
+                       return tevent_req_post(req, ev);
+               }
+       }
+
+       return req;
 }
 
+static void wrepl_request_writev_done(struct tevent_req *subreq);
 
-/*
-  destroy a wrepl_request
-*/
-static int wrepl_request_destructor(void *ptr)
+static void wrepl_request_trigger(struct tevent_req *req,
+                                 void *private_data)
 {
-       struct wrepl_request *req = talloc_get_type(ptr, struct wrepl_request);
-       if (req->state == WREPL_REQUEST_SEND) {
-               DLIST_REMOVE(req->wrepl_socket->send_queue, req);
+       struct wrepl_request_state *state = tevent_req_data(req,
+                                           struct wrepl_request_state);
+       struct tevent_req *subreq;
+
+       if (state->caller.wrepl_socket->stream == NULL) {
+               tevent_req_nterror(req, NT_STATUS_INVALID_CONNECTION);
+               return;
        }
-       if (req->state == WREPL_REQUEST_RECV) {
-               DLIST_REMOVE(req->wrepl_socket->recv_queue, req);
+
+       if (DEBUGLVL(10)) {
+               DEBUG(10,("Sending WINS packet of length %u\n",
+                         (unsigned)state->req.blob.length));
+               NDR_PRINT_DEBUG(wrepl_packet, &state->req.wrap.packet);
        }
-       req->state = WREPL_REQUEST_ERROR;
-       return 0;
-}
 
-/*
-  wait for a request to complete
-*/
-static NTSTATUS wrepl_request_wait(struct wrepl_request *req)
-{
-       NT_STATUS_HAVE_NO_MEMORY(req);
-       while (req->state < WREPL_REQUEST_DONE) {
-               event_loop_once(req->wrepl_socket->event_ctx);
+       subreq = tstream_writev_send(state,
+                                    state->caller.ev,
+                                    state->caller.wrepl_socket->stream,
+                                    &state->req.iov, 1);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
        }
-       return req->status;
+       tevent_req_set_callback(subreq, wrepl_request_writev_done, req);
 }
 
+static void wrepl_request_disconnect_done(struct tevent_req *subreq);
+static void wrepl_request_read_pdu_done(struct tevent_req *subreq);
 
-/*
-  connect a wrepl_socket to a WINS server
-*/
-struct wrepl_request *wrepl_connect_send(struct wrepl_socket *wrepl_socket,
-                                        const char *address)
+static void wrepl_request_writev_done(struct tevent_req *subreq)
 {
-       struct wrepl_request *req;
-       NTSTATUS status;
-
-       req = talloc_zero(wrepl_socket, struct wrepl_request);
-       if (req == NULL) goto failed;
-
-       req->wrepl_socket = wrepl_socket;
-       req->state        = WREPL_REQUEST_RECV;
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_request_state *state = tevent_req_data(req,
+                                           struct wrepl_request_state);
+       int ret;
+       int sys_errno;
+
+       ret = tstream_writev_recv(subreq, &sys_errno);
+       TALLOC_FREE(subreq);
+       if (ret == -1) {
+               NTSTATUS status = map_nt_error_from_unix(sys_errno);
+               TALLOC_FREE(state->caller.wrepl_socket->stream);
+               tevent_req_nterror(req, status);
+               return;
+       }
 
-       DLIST_ADD(wrepl_socket->recv_queue, req);
+       if (state->caller.wrepl_socket->stream == NULL) {
+               tevent_req_nterror(req, NT_STATUS_INVALID_CONNECTION);
+               return;
+       }
 
-       talloc_set_destructor(req, wrepl_request_destructor);
-       
-       status = socket_connect(wrepl_socket->sock, iface_best_ip(address), 0, address, 
-                               WINS_REPLICATION_PORT, 0);
-       if (!NT_STATUS_EQUAL(status, NT_STATUS_MORE_PROCESSING_REQUIRED)) goto failed;
+       if (state->ctrl.disconnect_after_send) {
+               subreq = tstream_disconnect_send(state,
+                                                state->caller.ev,
+                                                state->caller.wrepl_socket->stream);
+               if (tevent_req_nomem(subreq, req)) {
+                       return;
+               }
+               tevent_req_set_callback(subreq, wrepl_request_disconnect_done, req);
+               return;
+       }
 
-       return req;
+       if (state->ctrl.send_only) {
+               tevent_req_done(req);
+               return;
+       }
 
-failed:
-       talloc_free(req);
-       return NULL;
+       subreq = tstream_read_pdu_blob_send(state,
+                                           state->caller.ev,
+                                           state->caller.wrepl_socket->stream,
+                                           4, /* initial_read_size */
+                                           packet_full_request_u32,
+                                           NULL);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, wrepl_request_read_pdu_done, req);
 }
 
-/*
-  connect a wrepl_socket to a WINS server - recv side
-*/
-NTSTATUS wrepl_connect_recv(struct wrepl_request *req)
+static void wrepl_request_disconnect_done(struct tevent_req *subreq)
 {
-       return wrepl_request_wait(req);
-}
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_request_state *state = tevent_req_data(req,
+                                           struct wrepl_request_state);
+       int ret;
+       int sys_errno;
+
+       ret = tstream_disconnect_recv(subreq, &sys_errno);
+       TALLOC_FREE(subreq);
+       if (ret == -1) {
+               NTSTATUS status = map_nt_error_from_unix(sys_errno);
+               TALLOC_FREE(state->caller.wrepl_socket->stream);
+               tevent_req_nterror(req, status);
+               return;
+       }
 
+       DEBUG(10,("WINS connection disconnected\n"));
+       TALLOC_FREE(state->caller.wrepl_socket->stream);
 
-/*
-  connect a wrepl_socket to a WINS server - sync API
-*/
-NTSTATUS wrepl_connect(struct wrepl_socket *wrepl_socket, const char *address)
-{
-       struct wrepl_request *req = wrepl_connect_send(wrepl_socket, address);
-       return wrepl_connect_recv(req);
+       tevent_req_done(req);
 }
 
-
-/*
-  send a generic wins replication request
-*/
-struct wrepl_request *wrepl_request_send(struct wrepl_socket *wrepl_socket,
-                                        struct wrepl_packet *packet)
+static void wrepl_request_read_pdu_done(struct tevent_req *subreq)
 {
-       struct wrepl_request *req;
-       struct wrepl_wrap wrap;
-
-       req = talloc_zero(wrepl_socket, struct wrepl_request);
-       if (req == NULL) goto failed;
-
-       req->wrepl_socket = wrepl_socket;
-       req->state        = WREPL_REQUEST_SEND;
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_request_state *state = tevent_req_data(req,
+                                           struct wrepl_request_state);
+       NTSTATUS status;
+       DATA_BLOB blob;
+       enum ndr_err_code ndr_err;
 
-       wrap.packet = *packet;
-       req->status = ndr_push_struct_blob(&req->buffer, req, &wrap,
-                                          (ndr_push_flags_fn_t)ndr_push_wrepl_wrap);   
-       if (!NT_STATUS_IS_OK(req->status)) goto failed;
+       status = tstream_read_pdu_blob_recv(subreq, state, &state->rep.blob);
+       if (!NT_STATUS_IS_OK(status)) {
+               TALLOC_FREE(state->caller.wrepl_socket->stream);
+               tevent_req_nterror(req, status);
+               return;
+       }
 
-       if (DEBUGLVL(10)) {
-               DEBUG(10,("Sending WINS packet of length %d\n", (int)req->buffer.length));
-               NDR_PRINT_DEBUG(wrepl_packet, &wrap.packet);
+       state->rep.packet = talloc(state, struct wrepl_packet);
+       if (tevent_req_nomem(state->rep.packet, req)) {
+               return;
        }
 
-       DLIST_ADD(wrepl_socket->send_queue, req);
+       blob.data = state->rep.blob.data + 4;
+       blob.length = state->rep.blob.length - 4;
 
-       talloc_set_destructor(req, wrepl_request_destructor);
+       /* we have a full request - parse it */
+       ndr_err = ndr_pull_struct_blob(&blob,
+                                      state->rep.packet,
+                                      state->rep.packet,
+                                      (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               status = ndr_map_error2ntstatus(ndr_err);
+               tevent_req_nterror(req, status);
+               return;
+       }
 
-       EVENT_FD_WRITEABLE(wrepl_socket->fde);
-       
-       return req;
+       if (DEBUGLVL(10)) {
+               DEBUG(10,("Received WINS packet of length %u\n",
+                         (unsigned)state->rep.blob.length));
+               NDR_PRINT_DEBUG(wrepl_packet, state->rep.packet);
+       }
 
-failed:
-       talloc_free(req);
-       return NULL;
+       tevent_req_done(req);
 }
 
-/*
-  receive a generic WINS replication reply
-*/
-NTSTATUS wrepl_request_recv(struct wrepl_request *req,
+NTSTATUS wrepl_request_recv(struct tevent_req *req,
                            TALLOC_CTX *mem_ctx,
                            struct wrepl_packet **packet)
 {
-       NTSTATUS status = wrepl_request_wait(req);
-       if (NT_STATUS_IS_OK(status)) {
-               *packet = talloc_steal(mem_ctx, req->packet);
+       struct wrepl_request_state *state = tevent_req_data(req,
+                                           struct wrepl_request_state);
+       NTSTATUS status;
+
+       if (tevent_req_is_nterror(req, &status)) {
+               TALLOC_FREE(state->caller.wrepl_socket->stream);
+               tevent_req_received(req);
+               return status;
+       }
+
+       if (packet) {
+               *packet = talloc_move(mem_ctx, &state->rep.packet);
        }
-       talloc_free(req);
-       return status;
+
+       tevent_req_received(req);
+       return NT_STATUS_OK;
 }
 
 /*
@@ -427,55 +578,135 @@ NTSTATUS wrepl_request_recv(struct wrepl_request *req,
 */
 NTSTATUS wrepl_request(struct wrepl_socket *wrepl_socket,
                       TALLOC_CTX *mem_ctx,
-                      struct wrepl_packet *req_packet,
+                      const struct wrepl_packet *req_packet,
                       struct wrepl_packet **reply_packet)
 {
-       struct wrepl_request *req = wrepl_request_send(wrepl_socket, req_packet);
-       return wrepl_request_recv(req, mem_ctx, reply_packet);
+       struct tevent_req *subreq;
+       bool ok;
+       NTSTATUS status;
+
+       subreq = wrepl_request_send(mem_ctx, wrepl_socket->event.ctx,
+                                   wrepl_socket, req_packet, NULL);
+       NT_STATUS_HAVE_NO_MEMORY(subreq);
+
+       ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+       if (!ok) {
+               TALLOC_FREE(subreq);
+               return NT_STATUS_INTERNAL_ERROR;
+       }
+
+       status = wrepl_request_recv(subreq, mem_ctx, reply_packet);
+       TALLOC_FREE(subreq);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return NT_STATUS_OK;
 }
 
 
-/*
-  setup an association - send
-*/
-struct wrepl_request *wrepl_associate_send(struct wrepl_socket *wrepl_socket,
-                                          struct wrepl_associate *io)
+struct wrepl_associate_state {
+       struct wrepl_packet packet;
+       uint32_t assoc_ctx;
+       uint16_t major_version;
+};
+
+static void wrepl_associate_done(struct tevent_req *subreq);
+
+struct tevent_req *wrepl_associate_send(TALLOC_CTX *mem_ctx,
+                                       struct tevent_context *ev,
+                                       struct wrepl_socket *wrepl_socket,
+                                       const struct wrepl_associate *io)
 {
-       struct wrepl_packet *packet;
-       struct wrepl_request *req;
+       struct tevent_req *req;
+       struct wrepl_associate_state *state;
+       struct tevent_req *subreq;
+
+       if (wrepl_socket->event.ctx != ev) {
+               /* TODO: remove wrepl_socket->event.ctx !!! */
+               smb_panic("wrepl_associate_send event context mismatch!");
+               return NULL;
+       }
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct wrepl_associate_state);
+       if (req == NULL) {
+               return NULL;
+       };
+
+       state->packet.opcode                            = WREPL_OPCODE_BITS;
+       state->packet.mess_type                         = WREPL_START_ASSOCIATION;
+       state->packet.message.start.minor_version       = 2;
+       state->packet.message.start.major_version       = 5;
+
+       /*
+        * nt4 uses 41 bytes for the start_association call
+        * so do it the same and as we don't know th emeanings of this bytes
+        * we just send zeros and nt4, w2k and w2k3 seems to be happy with this
+        *
+        * if we don't do this nt4 uses an old version of the wins replication protocol
+        * and that would break nt4 <-> samba replication
+        */
+       state->packet.padding   = data_blob_talloc(state, NULL, 21);
+       if (tevent_req_nomem(state->packet.padding.data, req)) {
+               return tevent_req_post(req, ev);
+       }
+       memset(state->packet.padding.data, 0, state->packet.padding.length);
 
-       packet = talloc_zero(wrepl_socket, struct wrepl_packet);
-       if (packet == NULL) return NULL;
+       subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, wrepl_associate_done, req);
 
-       packet->opcode                      = WREPL_OPCODE_BITS;
-       packet->mess_type                   = WREPL_START_ASSOCIATION;
-       packet->message.start.minor_version = 2;
-       packet->message.start.major_version = 5;
+       return req;
+}
 
-       req = wrepl_request_send(wrepl_socket, packet);
+static void wrepl_associate_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_associate_state *state = tevent_req_data(req,
+                                             struct wrepl_associate_state);
+       NTSTATUS status;
+       struct wrepl_packet *packet;
+
+       status = wrepl_request_recv(subreq, state, &packet);
+       TALLOC_FREE(subreq);
+       if (!NT_STATUS_IS_OK(status)) {
+               tevent_req_nterror(req, status);
+               return;
+       }
+
+       if (packet->mess_type != WREPL_START_ASSOCIATION_REPLY) {
+               tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
+               return;
+       }
 
-       talloc_free(packet);
+       state->assoc_ctx = packet->message.start_reply.assoc_ctx;
+       state->major_version = packet->message.start_reply.major_version;
 
-       return req;     
+       tevent_req_done(req);
 }
 
 /*
   setup an association - recv
 */
-NTSTATUS wrepl_associate_recv(struct wrepl_request *req,
+NTSTATUS wrepl_associate_recv(struct tevent_req *req,
                              struct wrepl_associate *io)
 {
-       struct wrepl_packet *packet=NULL;
+       struct wrepl_associate_state *state = tevent_req_data(req,
+                                             struct wrepl_associate_state);
        NTSTATUS status;
-       status = wrepl_request_recv(req, req->wrepl_socket, &packet);
-       if (packet->mess_type != WREPL_START_ASSOCIATION_REPLY) {
-               status = NT_STATUS_UNEXPECTED_NETWORK_ERROR;
-       }
-       if (NT_STATUS_IS_OK(status)) {
-               io->out.assoc_ctx = packet->message.start_reply.assoc_ctx;
+
+       if (tevent_req_is_nterror(req, &status)) {
+               tevent_req_received(req);
+               return status;
        }
-       talloc_free(packet);
-       return status;
+
+       io->out.assoc_ctx = state->assoc_ctx;
+       io->out.major_version = state->major_version;
+
+       tevent_req_received(req);
+       return NT_STATUS_OK;
 }
 
 /*
@@ -484,206 +715,430 @@ NTSTATUS wrepl_associate_recv(struct wrepl_request *req,
 NTSTATUS wrepl_associate(struct wrepl_socket *wrepl_socket,
                         struct wrepl_associate *io)
 {
-       struct wrepl_request *req = wrepl_associate_send(wrepl_socket, io);
-       return wrepl_associate_recv(req, io);
+       struct tevent_req *subreq;
+       bool ok;
+       NTSTATUS status;
+
+       subreq = wrepl_associate_send(wrepl_socket, wrepl_socket->event.ctx,
+                                     wrepl_socket, io);
+       NT_STATUS_HAVE_NO_MEMORY(subreq);
+
+       ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+       if (!ok) {
+               TALLOC_FREE(subreq);
+               return NT_STATUS_INTERNAL_ERROR;
+       }
+
+       status = wrepl_associate_recv(subreq, io);
+       TALLOC_FREE(subreq);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return NT_STATUS_OK;
 }
 
+struct wrepl_associate_stop_state {
+       struct wrepl_packet packet;
+       struct wrepl_send_ctrl ctrl;
+};
 
-/*
-  fetch the partner tables - send
-*/
-struct wrepl_request *wrepl_pull_table_send(struct wrepl_socket *wrepl_socket,
-                                           struct wrepl_pull_table *io)
+static void wrepl_associate_stop_done(struct tevent_req *subreq);
+
+struct tevent_req *wrepl_associate_stop_send(TALLOC_CTX *mem_ctx,
+                                            struct tevent_context *ev,
+                                            struct wrepl_socket *wrepl_socket,
+                                            const struct wrepl_associate_stop *io)
 {
-       struct wrepl_packet *packet;
-       struct wrepl_request *req;
+       struct tevent_req *req;
+       struct wrepl_associate_stop_state *state;
+       struct tevent_req *subreq;
+
+       if (wrepl_socket->event.ctx != ev) {
+               /* TODO: remove wrepl_socket->event.ctx !!! */
+               smb_panic("wrepl_associate_stop_send event context mismatch!");
+               return NULL;
+       }
 
-       packet = talloc_zero(wrepl_socket, struct wrepl_packet);
-       if (packet == NULL) return NULL;
+       req = tevent_req_create(mem_ctx, &state,
+                               struct wrepl_associate_stop_state);
+       if (req == NULL) {
+               return NULL;
+       };
 
-       packet->opcode                      = WREPL_OPCODE_BITS;
-       packet->assoc_ctx                   = io->in.assoc_ctx;
-       packet->mess_type                   = WREPL_REPLICATION;
-       packet->message.replication.command = WREPL_REPL_TABLE_QUERY;
+       state->packet.opcode                    = WREPL_OPCODE_BITS;
+       state->packet.assoc_ctx                 = io->in.assoc_ctx;
+       state->packet.mess_type                 = WREPL_STOP_ASSOCIATION;
+       state->packet.message.stop.reason       = io->in.reason;
 
-       req = wrepl_request_send(wrepl_socket, packet);
+       if (io->in.reason == 0) {
+               state->ctrl.send_only                   = true;
+               state->ctrl.disconnect_after_send       = true;
+       }
 
-       talloc_free(packet);
+       subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, &state->ctrl);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, wrepl_associate_stop_done, req);
 
-       return req;     
+       return req;
 }
 
+static void wrepl_associate_stop_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_associate_stop_state *state = tevent_req_data(req,
+                                                  struct wrepl_associate_stop_state);
+       NTSTATUS status;
+
+       /* currently we don't care about a possible response */
+       status = wrepl_request_recv(subreq, state, NULL);
+       TALLOC_FREE(subreq);
+       if (!NT_STATUS_IS_OK(status)) {
+               tevent_req_nterror(req, status);
+               return;
+       }
+
+       tevent_req_done(req);
+}
 
 /*
-  fetch the partner tables - recv
+  stop an association - recv
 */
-NTSTATUS wrepl_pull_table_recv(struct wrepl_request *req,
-                              TALLOC_CTX *mem_ctx,
-                              struct wrepl_pull_table *io)
+NTSTATUS wrepl_associate_stop_recv(struct tevent_req *req,
+                                  struct wrepl_associate_stop *io)
 {
-       struct wrepl_packet *packet=NULL;
        NTSTATUS status;
-       struct wrepl_table *table;
-       int i;
-
-       status = wrepl_request_recv(req, req->wrepl_socket, &packet);
-       if (packet->mess_type != WREPL_REPLICATION) {
-               status = NT_STATUS_NETWORK_ACCESS_DENIED;
-       } else if (packet->message.replication.command != WREPL_REPL_TABLE_REPLY) {
-               status = NT_STATUS_UNEXPECTED_NETWORK_ERROR;
-       }
-       if (!NT_STATUS_IS_OK(status)) goto failed;
 
-       table = &packet->message.replication.info.table;
-       io->out.num_partners = table->partner_count;
-       io->out.partners = talloc_steal(mem_ctx, table->partners);
-       for (i=0;i<io->out.num_partners;i++) {
-               talloc_steal(io->out.partners, io->out.partners[i].address);
+       if (tevent_req_is_nterror(req, &status)) {
+               tevent_req_received(req);
+               return status;
        }
 
-failed:
-       talloc_free(packet);
-       return status;
+       tevent_req_received(req);
+       return NT_STATUS_OK;
 }
 
-
 /*
-  fetch the partner table - sync api
+  setup an association - sync api
 */
-NTSTATUS wrepl_pull_table(struct wrepl_socket *wrepl_socket,
-                         TALLOC_CTX *mem_ctx,
-                         struct wrepl_pull_table *io)
+NTSTATUS wrepl_associate_stop(struct wrepl_socket *wrepl_socket,
+                             struct wrepl_associate_stop *io)
 {
-       struct wrepl_request *req = wrepl_pull_table_send(wrepl_socket, io);
-       return wrepl_pull_table_recv(req, mem_ctx, io);
+       struct tevent_req *subreq;
+       bool ok;
+       NTSTATUS status;
+
+       subreq = wrepl_associate_stop_send(wrepl_socket, wrepl_socket->event.ctx,
+                                          wrepl_socket, io);
+       NT_STATUS_HAVE_NO_MEMORY(subreq);
+
+       ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+       if (!ok) {
+               TALLOC_FREE(subreq);
+               return NT_STATUS_INTERNAL_ERROR;
+       }
+
+       status = wrepl_associate_stop_recv(subreq, io);
+       TALLOC_FREE(subreq);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return NT_STATUS_OK;
 }
 
+struct wrepl_pull_table_state {
+       struct wrepl_packet packet;
+       uint32_t num_partners;
+       struct wrepl_wins_owner *partners;
+};
 
-/*
-  fetch the names for a WINS partner - send
-*/
-struct wrepl_request *wrepl_pull_names_send(struct wrepl_socket *wrepl_socket,
-                                           struct wrepl_pull_names *io)
+static void wrepl_pull_table_done(struct tevent_req *subreq);
+
+struct tevent_req *wrepl_pull_table_send(TALLOC_CTX *mem_ctx,
+                                        struct tevent_context *ev,
+                                        struct wrepl_socket *wrepl_socket,
+                                        const struct wrepl_pull_table *io)
+{
+       struct tevent_req *req;
+       struct wrepl_pull_table_state *state;
+       struct tevent_req *subreq;
+
+       if (wrepl_socket->event.ctx != ev) {
+               /* TODO: remove wrepl_socket->event.ctx !!! */
+               smb_panic("wrepl_pull_table_send event context mismatch!");
+               return NULL;
+       }
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct wrepl_pull_table_state);
+       if (req == NULL) {
+               return NULL;
+       };
+
+       state->packet.opcode                            = WREPL_OPCODE_BITS;
+       state->packet.assoc_ctx                         = io->in.assoc_ctx;
+       state->packet.mess_type                         = WREPL_REPLICATION;
+       state->packet.message.replication.command       = WREPL_REPL_TABLE_QUERY;
+
+       subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, wrepl_pull_table_done, req);
+
+       return req;
+}
+
+static void wrepl_pull_table_done(struct tevent_req *subreq)
 {
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_pull_table_state *state = tevent_req_data(req,
+                                              struct wrepl_pull_table_state);
+       NTSTATUS status;
        struct wrepl_packet *packet;
-       struct wrepl_request *req;
+       struct wrepl_table *table;
 
-       packet = talloc_zero(wrepl_socket, struct wrepl_packet);
-       if (packet == NULL) return NULL;
+       status = wrepl_request_recv(subreq, state, &packet);
+       TALLOC_FREE(subreq);
+       if (!NT_STATUS_IS_OK(status)) {
+               tevent_req_nterror(req, status);
+               return;
+       }
 
-       packet->opcode                         = WREPL_OPCODE_BITS;
-       packet->assoc_ctx                      = io->in.assoc_ctx;
-       packet->mess_type                      = WREPL_REPLICATION;
-       packet->message.replication.command    = WREPL_REPL_SEND_REQUEST;
-       packet->message.replication.info.owner = io->in.partner;
+       if (packet->mess_type != WREPL_REPLICATION) {
+               tevent_req_nterror(req, NT_STATUS_NETWORK_ACCESS_DENIED);
+               return;
+       }
+
+       if (packet->message.replication.command != WREPL_REPL_TABLE_REPLY) {
+               tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
+               return;
+       }
 
-       req = wrepl_request_send(wrepl_socket, packet);
+       table = &packet->message.replication.info.table;
 
-       talloc_free(packet);
+       state->num_partners = table->partner_count;
+       state->partners = talloc_move(state, &table->partners);
 
-       return req;     
+       tevent_req_done(req);
 }
 
+/*
+  fetch the partner tables - recv
+*/
+NTSTATUS wrepl_pull_table_recv(struct tevent_req *req,
+                              TALLOC_CTX *mem_ctx,
+                              struct wrepl_pull_table *io)
+{
+       struct wrepl_pull_table_state *state = tevent_req_data(req,
+                                              struct wrepl_pull_table_state);
+       NTSTATUS status;
+
+       if (tevent_req_is_nterror(req, &status)) {
+               tevent_req_received(req);
+               return status;
+       }
+
+       io->out.num_partners = state->num_partners;
+       io->out.partners = talloc_move(mem_ctx, &state->partners);
+
+       tevent_req_received(req);
+       return NT_STATUS_OK;
+}
 
 /*
-  extract a nbt_name from a WINS name buffer
+  fetch the partner table - sync api
 */
-static NTSTATUS wrepl_extract_name(struct nbt_name *name,
-                                  TALLOC_CTX *mem_ctx,
-                                  uint8_t *namebuf, uint32_t len)
+NTSTATUS wrepl_pull_table(struct wrepl_socket *wrepl_socket,
+                         TALLOC_CTX *mem_ctx,
+                         struct wrepl_pull_table *io)
 {
-       char *s;
+       struct tevent_req *subreq;
+       bool ok;
+       NTSTATUS status;
+
+       subreq = wrepl_pull_table_send(mem_ctx, wrepl_socket->event.ctx,
+                                      wrepl_socket, io);
+       NT_STATUS_HAVE_NO_MEMORY(subreq);
 
-       /* oh wow, what a nasty bug in windows ... */
-       if (namebuf[0] == 0x1b && len >= 16) {
-               namebuf[0] = namebuf[15];
-               namebuf[15] = 0x1b;
+       ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+       if (!ok) {
+               TALLOC_FREE(subreq);
+               return NT_STATUS_INTERNAL_ERROR;
        }
 
-       if (len < 17) {
-               make_nbt_name_client(name, talloc_strndup(mem_ctx, namebuf, len));
-               return NT_STATUS_OK;
+       status = wrepl_pull_table_recv(subreq, mem_ctx, io);
+       TALLOC_FREE(subreq);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return NT_STATUS_OK;
+}
+
+
+struct wrepl_pull_names_state {
+       struct {
+               const struct wrepl_pull_names *io;
+       } caller;
+       struct wrepl_packet packet;
+       uint32_t num_names;
+       struct wrepl_name *names;
+};
+
+static void wrepl_pull_names_done(struct tevent_req *subreq);
+
+struct tevent_req *wrepl_pull_names_send(TALLOC_CTX *mem_ctx,
+                                        struct tevent_context *ev,
+                                        struct wrepl_socket *wrepl_socket,
+                                        const struct wrepl_pull_names *io)
+{
+       struct tevent_req *req;
+       struct wrepl_pull_names_state *state;
+       struct tevent_req *subreq;
+
+       if (wrepl_socket->event.ctx != ev) {
+               /* TODO: remove wrepl_socket->event.ctx !!! */
+               smb_panic("wrepl_pull_names_send event context mismatch!");
+               return NULL;
        }
 
-       s = talloc_strndup(mem_ctx, namebuf, 15);
-       trim_string(s, NULL, " ");
-       name->name = s;
-       name->type = namebuf[15];
-       if (len > 18) {
-               name->scope = talloc_strndup(mem_ctx, namebuf+17, len-17);
-       } else {
-               name->scope = NULL;
+       req = tevent_req_create(mem_ctx, &state,
+                               struct wrepl_pull_names_state);
+       if (req == NULL) {
+               return NULL;
+       };
+       state->caller.io = io;
+
+       state->packet.opcode                            = WREPL_OPCODE_BITS;
+       state->packet.assoc_ctx                         = io->in.assoc_ctx;
+       state->packet.mess_type                         = WREPL_REPLICATION;
+       state->packet.message.replication.command       = WREPL_REPL_SEND_REQUEST;
+       state->packet.message.replication.info.owner    = io->in.partner;
+
+       subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
        }
+       tevent_req_set_callback(subreq, wrepl_pull_names_done, req);
 
-       return NT_STATUS_OK;
+       return req;
 }
 
-/*
-  fetch the names for a WINS partner - recv
-*/
-NTSTATUS wrepl_pull_names_recv(struct wrepl_request *req,
-                              TALLOC_CTX *mem_ctx,
-                              struct wrepl_pull_names *io)
+static void wrepl_pull_names_done(struct tevent_req *subreq)
 {
-       struct wrepl_packet *packet=NULL;
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_pull_names_state *state = tevent_req_data(req,
+                                              struct wrepl_pull_names_state);
        NTSTATUS status;
-       int i;
+       struct wrepl_packet *packet;
+       uint32_t i;
 
-       status = wrepl_request_recv(req, req->wrepl_socket, &packet);
-       if (packet->mess_type != WREPL_REPLICATION ||
-           packet->message.replication.command != WREPL_REPL_SEND_REPLY) {
-               status = NT_STATUS_UNEXPECTED_NETWORK_ERROR;
+       status = wrepl_request_recv(subreq, state, &packet);
+       TALLOC_FREE(subreq);
+       if (!NT_STATUS_IS_OK(status)) {
+               tevent_req_nterror(req, status);
+               return;
+       }
+
+       if (packet->mess_type != WREPL_REPLICATION) {
+               tevent_req_nterror(req, NT_STATUS_NETWORK_ACCESS_DENIED);
+               return;
        }
-       if (!NT_STATUS_IS_OK(status)) goto failed;
 
-       io->out.num_names = packet->message.replication.info.reply.num_names;
+       if (packet->message.replication.command != WREPL_REPL_SEND_REPLY) {
+               tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
+               return;
+       }
 
-       status = NT_STATUS_NO_MEMORY;
+       state->num_names = packet->message.replication.info.reply.num_names;
 
-       io->out.names = talloc_array(packet, struct wrepl_name, io->out.num_names);
-       if (io->out.names == NULL) goto failed;
+       state->names = talloc_array(state, struct wrepl_name, state->num_names);
+       if (tevent_req_nomem(state->names, req)) {
+               return;
+       }
 
        /* convert the list of names and addresses to a sane format */
-       for (i=0;i<io->out.num_names;i++) {
+       for (i=0; i < state->num_names; i++) {
                struct wrepl_wins_name *wname = &packet->message.replication.info.reply.names[i];
-               struct wrepl_name *name = &io->out.names[i];
-               status = wrepl_extract_name(&name->name, io->out.names, 
-                                           wname->name, wname->name_len);
-               if (!NT_STATUS_IS_OK(status)) goto failed;
+               struct wrepl_name *name = &state->names[i];
+
+               name->name      = *wname->name;
+               talloc_steal(state->names, wname->name);
+               name->type      = WREPL_NAME_TYPE(wname->flags);
+               name->state     = WREPL_NAME_STATE(wname->flags);
+               name->node      = WREPL_NAME_NODE(wname->flags);
+               name->is_static = WREPL_NAME_IS_STATIC(wname->flags);
+               name->raw_flags = wname->flags;
+               name->version_id= wname->id;
+               name->owner     = talloc_strdup(state->names,
+                                               state->caller.io->in.partner.address);
+               if (tevent_req_nomem(name->owner, req)) {
+                       return;
+               }
 
                /* trying to save 1 or 2 bytes on the wire isn't a good idea */
                if (wname->flags & 2) {
-                       int j;
+                       uint32_t j;
 
                        name->num_addresses = wname->addresses.addresses.num_ips;
-                       name->addresses = talloc_array(io->out.names, 
-                                                      struct wrepl_address, 
+                       name->addresses = talloc_array(state->names,
+                                                      struct wrepl_address,
                                                       name->num_addresses);
-                       if (name->addresses == NULL) goto failed;
+                       if (tevent_req_nomem(name->addresses, req)) {
+                               return;
+                       }
+
                        for (j=0;j<name->num_addresses;j++) {
-                               name->addresses[j].owner = 
-                                       talloc_steal(name->addresses, 
-                                                    wname->addresses.addresses.ips[j].owner);
+                               name->addresses[j].owner =
+                                       talloc_move(name->addresses,
+                                                   &wname->addresses.addresses.ips[j].owner);
                                name->addresses[j].address = 
-                                       talloc_steal(name->addresses, 
-                                                    wname->addresses.addresses.ips[j].ip);
+                                       talloc_move(name->addresses,
+                                                   &wname->addresses.addresses.ips[j].ip);
                        }
                } else {
                        name->num_addresses = 1;
-                       name->addresses = talloc(io->out.names, struct wrepl_address);
-                       if (name->addresses == NULL) goto failed;
-                       name->addresses[0].owner = io->in.partner.address;
-                       name->addresses[0].address = talloc_steal(name->addresses,
-                                                                 wname->addresses.ip);
+                       name->addresses = talloc_array(state->names,
+                                                      struct wrepl_address,
+                                                      name->num_addresses);
+                       if (tevent_req_nomem(name->addresses, req)) {
+                               return;
+                       }
+
+                       name->addresses[0].owner = talloc_strdup(name->addresses, name->owner);
+                       if (tevent_req_nomem(name->addresses[0].owner, req)) {
+                               return;
+                       }
+                       name->addresses[0].address = talloc_move(name->addresses,
+                                                                &wname->addresses.ip);
                }
        }
 
-       talloc_steal(mem_ctx, io->out.names);
-       status = NT_STATUS_OK;
+       tevent_req_done(req);
+}
 
-failed:
-       talloc_free(packet);
-       return status;
+/*
+  fetch the names for a WINS partner - recv
+*/
+NTSTATUS wrepl_pull_names_recv(struct tevent_req *req,
+                              TALLOC_CTX *mem_ctx,
+                              struct wrepl_pull_names *io)
+{
+       struct wrepl_pull_names_state *state = tevent_req_data(req,
+                                              struct wrepl_pull_names_state);
+       NTSTATUS status;
+
+       if (tevent_req_is_nterror(req, &status)) {
+               tevent_req_received(req);
+               return status;
+       }
+
+       io->out.num_names = state->num_names;
+       io->out.names = talloc_move(mem_ctx, &state->names);
+
+       tevent_req_received(req);
+       return NT_STATUS_OK;
 }
 
 
@@ -695,6 +1150,23 @@ NTSTATUS wrepl_pull_names(struct wrepl_socket *wrepl_socket,
                          TALLOC_CTX *mem_ctx,
                          struct wrepl_pull_names *io)
 {
-       struct wrepl_request *req = wrepl_pull_names_send(wrepl_socket, io);
-       return wrepl_pull_names_recv(req, mem_ctx, io);
+       struct tevent_req *subreq;
+       bool ok;
+       NTSTATUS status;
+
+       subreq = wrepl_pull_names_send(mem_ctx, wrepl_socket->event.ctx,
+                                      wrepl_socket, io);
+       NT_STATUS_HAVE_NO_MEMORY(subreq);
+
+       ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+       if (!ok) {
+               TALLOC_FREE(subreq);
+               return NT_STATUS_INTERNAL_ERROR;
+       }
+
+       status = wrepl_pull_names_recv(subreq, mem_ctx, io);
+       TALLOC_FREE(subreq);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return NT_STATUS_OK;
 }