Finish removal of iconv_convenience in public API's.
[bbaumbach/samba-autobuild/.git] / source4 / libcli / wrepl / winsrepl.c
index 297dccbf388c6e27574443fa88d923544fee4415..75cb34f6327dfa6387d6628bb8e13d7ccee81c57 100644 (file)
@@ -4,10 +4,11 @@
    low level WINS replication client code
 
    Copyright (C) Andrew Tridgell 2005
+   Copyright (C) Stefan Metzmacher 2005-2010
    
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
+   the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.
    
    This program is distributed in the hope that it will be useful,
    GNU General Public License for more details.
    
    You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 #include "includes.h"
 #include "lib/events/events.h"
-#include "dlinklist.h"
-#include "lib/socket/socket.h"
+#include "../lib/util/dlinklist.h"
 #include "libcli/wrepl/winsrepl.h"
+#include "librpc/gen_ndr/ndr_winsrepl.h"
+#include "lib/stream/packet.h"
+#include "system/network.h"
+#include "lib/socket/netif.h"
+#include "param/param.h"
+#include "lib/util/tevent_ntstatus.h"
+#include "lib/tsocket/tsocket.h"
+#include "libcli/util/tstream.h"
 
 /*
-  mark all pending requests as dead - called when a socket error happens
+  main context structure for the wins replication client library
 */
-static void wrepl_socket_dead(struct wrepl_socket *wrepl_socket, NTSTATUS status)
-{
-       wrepl_socket->dead = True;
+struct wrepl_socket {
+       struct {
+               struct tevent_context *ctx;
+       } event;
 
-       if (wrepl_socket->fde) {
-               talloc_free(wrepl_socket->fde);
-               wrepl_socket->fde = NULL;
-       }
+       /* the default timeout for requests, 0 means no timeout */
+#define WREPL_SOCKET_REQUEST_TIMEOUT   (60)
+       uint32_t request_timeout;
 
-       if (wrepl_socket->sock) {
-               talloc_free(wrepl_socket->sock);
-               wrepl_socket->sock = NULL;
-       }
+       struct tevent_queue *request_queue;
 
-       if (NT_STATUS_EQUAL(NT_STATUS_UNSUCCESSFUL, status)) {
-               status = NT_STATUS_UNEXPECTED_NETWORK_ERROR;
-       }
-       while (wrepl_socket->send_queue) {
-               struct wrepl_request *req = wrepl_socket->send_queue;
-               DLIST_REMOVE(wrepl_socket->send_queue, req);
-               req->state = WREPL_REQUEST_ERROR;
-               req->status = status;
-               if (req->async.fn) {
-                       req->async.fn(req);
-               }
+       struct tstream_context *stream;
+};
+
+bool wrepl_socket_is_connected(struct wrepl_socket *wrepl_sock)
+{
+       if (!wrepl_sock) {
+               return false;
        }
-       while (wrepl_socket->recv_queue) {
-               struct wrepl_request *req = wrepl_socket->recv_queue;
-               DLIST_REMOVE(wrepl_socket->recv_queue, req);
-               req->state = WREPL_REQUEST_ERROR;
-               req->status = status;
-               if (req->async.fn) {
-                       req->async.fn(req);
-               }
+
+       if (!wrepl_sock->stream) {
+               return false;
        }
-}
 
-static void wrepl_request_timeout_handler(struct event_context *ev, struct timed_event *te,
-                                         struct timeval t, void *ptr)
-{
-       struct wrepl_request *req = talloc_get_type(ptr, struct wrepl_request);
-       wrepl_socket_dead(req->wrepl_socket, NT_STATUS_IO_TIMEOUT);
+       return true;
 }
 
 /*
-  handle send events 
+  initialise a wrepl_socket. The event_ctx is optional, if provided then
+  operations will use that event context
 */
-static void wrepl_handler_send(struct wrepl_socket *wrepl_socket)
+struct wrepl_socket *wrepl_socket_init(TALLOC_CTX *mem_ctx,
+                                      struct tevent_context *event_ctx)
 {
-       while (wrepl_socket->send_queue) {
-               struct wrepl_request *req = wrepl_socket->send_queue;
-               size_t nsent;
-               NTSTATUS status;
-
-               status = socket_send(wrepl_socket->sock, &req->buffer, &nsent, 0);
-               if (NT_STATUS_IS_ERR(status)) {
-                       wrepl_socket_dead(wrepl_socket, status);
-                       return;
-               }
-               if (!NT_STATUS_IS_OK(status) || nsent == 0) return;
+       struct wrepl_socket *wrepl_socket;
 
-               req->buffer.data   += nsent;
-               req->buffer.length -= nsent;
-               if (req->buffer.length != 0) {
-                       return;
-               }
+       wrepl_socket = talloc_zero(mem_ctx, struct wrepl_socket);
+       if (!wrepl_socket) {
+               return NULL;
+       }
 
-               DLIST_REMOVE(wrepl_socket->send_queue, req);
-               DLIST_ADD_END(wrepl_socket->recv_queue, req, struct wrepl_request *);
-               req->state = WREPL_REQUEST_RECV;
+       wrepl_socket->event.ctx = event_ctx;
+       if (!wrepl_socket->event.ctx) {
+               goto failed;
+       }
 
-               EVENT_FD_READABLE(wrepl_socket->fde);
+       wrepl_socket->request_queue = tevent_queue_create(wrepl_socket,
+                                                         "wrepl request queue");
+       if (wrepl_socket->request_queue == NULL) {
+               goto failed;
        }
 
-       EVENT_FD_NOT_WRITEABLE(wrepl_socket->fde);
-}
+       wrepl_socket->request_timeout   = WREPL_SOCKET_REQUEST_TIMEOUT;
+
+       return wrepl_socket;
 
+failed:
+       talloc_free(wrepl_socket);
+       return NULL;
+}
 
 /*
-  handle recv events 
+  initialise a wrepl_socket from an already existing connection
 */
-static void wrepl_handler_recv(struct wrepl_socket *wrepl_socket)
+NTSTATUS wrepl_socket_donate_stream(struct wrepl_socket *wrepl_socket,
+                                   struct tstream_context **stream)
 {
-       size_t nread;
-       struct wrepl_request *req = wrepl_socket->recv_queue;
-       DATA_BLOB blob;
+       if (wrepl_socket->stream) {
+               return NT_STATUS_CONNECTION_ACTIVE;
+       }
 
-       if (req == NULL) {
-               NTSTATUS status;
+       wrepl_socket->stream = talloc_move(wrepl_socket, stream);
+       return NT_STATUS_OK;
+}
 
-               EVENT_FD_NOT_READABLE(wrepl_socket->fde);
+/*
+  initialise a wrepl_socket from an already existing connection
+*/
+NTSTATUS wrepl_socket_split_stream(struct wrepl_socket *wrepl_socket,
+                                  TALLOC_CTX *mem_ctx,
+                                  struct tstream_context **stream)
+{
+       size_t num_requests;
 
-               status = socket_recv(wrepl_socket->sock, NULL, 0, &nread, 0);
-               if (NT_STATUS_EQUAL(NT_STATUS_END_OF_FILE,status)) return;
-               if (NT_STATUS_IS_ERR(status)) {
-                       wrepl_socket_dead(wrepl_socket, status);
-                       return;
-               }
-               return;
+       if (!wrepl_socket->stream) {
+               return NT_STATUS_CONNECTION_INVALID;
        }
 
-       if (req->buffer.length == 0) {
-               req->buffer = data_blob_talloc(req, NULL, 4);
-               if (req->buffer.data == NULL) {
-                       req->status = NT_STATUS_NO_MEMORY;
-                       goto failed;
-               }
-               req->num_read = 0;
+       num_requests = tevent_queue_length(wrepl_socket->request_queue);
+       if (num_requests > 0) {
+               return NT_STATUS_CONNECTION_IN_USE;
        }
 
-       /* read in the packet length */
-       if (req->num_read < 4) {
-               uint32_t req_length;
-
-               req->status = socket_recv(wrepl_socket->sock, 
-                                         req->buffer.data + req->num_read,
-                                         4 - req->num_read,
-                                         &nread, 0);
-               if (NT_STATUS_IS_ERR(req->status)) {
-                       wrepl_socket_dead(wrepl_socket, req->status);
-                       return;
-               }
-               if (!NT_STATUS_IS_OK(req->status)) return;
-
-               req->num_read += nread;
-               if (req->num_read != 4) return;
+       *stream = talloc_move(wrepl_socket, &wrepl_socket->stream);
+       return NT_STATUS_OK;
+}
 
-               req_length = RIVAL(req->buffer.data, 0) + 4;
+const char *wrepl_best_ip(struct loadparm_context *lp_ctx, const char *peer_ip)
+{
+       struct interface *ifaces;
+       load_interfaces(lp_ctx, lp_interfaces(lp_ctx), &ifaces);
+       return iface_best_ip(ifaces, peer_ip);
+}
 
-               req->buffer.data = talloc_realloc(req, req->buffer.data, 
-                                                 uint8_t, req_length);
-               if (req->buffer.data == NULL) {
-                       req->status = NT_STATUS_NO_MEMORY;
-                       goto failed;
-               }
-               req->buffer.length = req_length;
-       }
+struct wrepl_connect_state {
+       struct {
+               struct wrepl_socket *wrepl_socket;
+               struct tevent_context *ev;
+       } caller;
+       struct tsocket_address *local_address;
+       struct tsocket_address *remote_address;
+       struct tstream_context *stream;
+};
+
+static void wrepl_connect_trigger(struct tevent_req *req,
+                                 void *private_date);
+
+struct tevent_req *wrepl_connect_send(TALLOC_CTX *mem_ctx,
+                                     struct tevent_context *ev,
+                                     struct wrepl_socket *wrepl_socket,
+                                     const char *our_ip, const char *peer_ip)
+{
+       struct tevent_req *req;
+       struct wrepl_connect_state *state;
+       int ret;
+       bool ok;
 
-       /* read in the body */
-       req->status = socket_recv(wrepl_socket->sock, 
-                                 req->buffer.data + req->num_read,
-                                 req->buffer.length - req->num_read,
-                                 &nread, 0);
-       if (NT_STATUS_IS_ERR(req->status)) {
-               wrepl_socket_dead(wrepl_socket, req->status);
-               return;
+       req = tevent_req_create(mem_ctx, &state,
+                               struct wrepl_connect_state);
+       if (req == NULL) {
+               return NULL;
        }
-       if (!NT_STATUS_IS_OK(req->status)) return;
 
-       req->num_read += nread;
-       if (req->num_read != req->buffer.length) return;
+       state->caller.wrepl_socket = wrepl_socket;
+       state->caller.ev = ev;
 
-       req->packet = talloc(req, struct wrepl_packet);
-       if (req->packet == NULL) {
-               req->status = NT_STATUS_NO_MEMORY;
-               goto failed;
+       if (wrepl_socket->stream) {
+               tevent_req_nterror(req, NT_STATUS_CONNECTION_ACTIVE);
+               return tevent_req_post(req, ev);
        }
 
-       blob.data = req->buffer.data + 4;
-       blob.length = req->buffer.length - 4;
-       
-       /* we have a full request - parse it */
-       req->status = ndr_pull_struct_blob(&blob,
-                                          req->packet, req->packet,
-                                          (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet);
-       if (!NT_STATUS_IS_OK(req->status)) {
-               DEBUG(2,("Failed to parse incoming WINS packet - %s\n",
-                        nt_errstr(req->status)));
-               DEBUG(10,("packet length %d\n", (int)req->buffer.length));
-               NDR_PRINT_DEBUG(wrepl_packet, req->packet);
-               goto failed;
+       ret = tsocket_address_inet_from_strings(state, "ipv4",
+                                               our_ip, 0,
+                                               &state->local_address);
+       if (ret != 0) {
+               NTSTATUS status = map_nt_error_from_unix(errno);
+               tevent_req_nterror(req, status);
+               return tevent_req_post(req, ev);
        }
 
-       if (DEBUGLVL(10)) {
-               DEBUG(10,("Received WINS packet of length %d\n", (int)req->buffer.length));
-               NDR_PRINT_DEBUG(wrepl_packet, req->packet);
+       ret = tsocket_address_inet_from_strings(state, "ipv4",
+                                               peer_ip, WINS_REPLICATION_PORT,
+                                               &state->remote_address);
+       if (ret != 0) {
+               NTSTATUS status = map_nt_error_from_unix(errno);
+               tevent_req_nterror(req, status);
+               return tevent_req_post(req, ev);
        }
 
-       DLIST_REMOVE(wrepl_socket->recv_queue, req);
-       req->state = WREPL_REQUEST_DONE;
-       if (req->async.fn) {
-               req->async.fn(req);
+       ok = tevent_queue_add(wrepl_socket->request_queue,
+                             ev,
+                             req,
+                             wrepl_connect_trigger,
+                             NULL);
+       if (!ok) {
+               tevent_req_nomem(NULL, req);
+               return tevent_req_post(req, ev);
        }
-       return;
 
-failed:
-       if (req->state == WREPL_REQUEST_RECV) {
-               DLIST_REMOVE(wrepl_socket->recv_queue, req);
-       }
-       req->state = WREPL_REQUEST_ERROR;
-       if (req->async.fn) {
-               req->async.fn(req);
+       if (wrepl_socket->request_timeout > 0) {
+               struct timeval endtime;
+               endtime = tevent_timeval_current_ofs(wrepl_socket->request_timeout, 0);
+               ok = tevent_req_set_endtime(req, ev, endtime);
+               if (!ok) {
+                       return tevent_req_post(req, ev);
+               }
        }
+
+       return req;
 }
 
+static void wrepl_connect_done(struct tevent_req *subreq);
 
-/*
-  handler for winrepl events
-*/
-static void wrepl_handler(struct event_context *ev, struct fd_event *fde, 
-                         uint16_t flags, void *private)
+static void wrepl_connect_trigger(struct tevent_req *req,
+                                 void *private_date)
 {
-       struct wrepl_socket *wrepl_socket = talloc_get_type(private, 
-                                                           struct wrepl_socket);
-       if (flags & EVENT_FD_WRITE) {
-               wrepl_handler_send(wrepl_socket);
-       }
-       if (flags & EVENT_FD_READ) {
-               wrepl_handler_recv(wrepl_socket);
+       struct wrepl_connect_state *state = tevent_req_data(req,
+                                           struct wrepl_connect_state);
+       struct tevent_req *subreq;
+
+       subreq = tstream_inet_tcp_connect_send(state,
+                                              state->caller.ev,
+                                              state->local_address,
+                                              state->remote_address);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
        }
+       tevent_req_set_callback(subreq, wrepl_connect_done, req);
+
+       return;
 }
 
+static void wrepl_connect_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_connect_state *state = tevent_req_data(req,
+                                           struct wrepl_connect_state);
+       int ret;
+       int sys_errno;
+
+       ret = tstream_inet_tcp_connect_recv(subreq, &sys_errno,
+                                           state, &state->stream);
+       if (ret != 0) {
+               NTSTATUS status = map_nt_error_from_unix(sys_errno);
+               tevent_req_nterror(req, status);
+               return;
+       }
+
+       tevent_req_done(req);
+}
 
 /*
-  handler for winrepl connection completion
+  connect a wrepl_socket to a WINS server - recv side
 */
-static void wrepl_connect_handler(struct event_context *ev, struct fd_event *fde, 
-                                 uint16_t flags, void *private)
+NTSTATUS wrepl_connect_recv(struct tevent_req *req)
 {
-       struct wrepl_socket *wrepl_socket = talloc_get_type(private, 
-                                                           struct wrepl_socket);
-       struct wrepl_request *req = wrepl_socket->recv_queue;
-
-       talloc_free(wrepl_socket->fde);
-       wrepl_socket->fde = NULL;
-
-       if (req == NULL) return;
-
-       req->status = socket_connect_complete(wrepl_socket->sock, 0);
-       if (NT_STATUS_IS_ERR(req->status)) goto failed;
-
-       if (!NT_STATUS_IS_OK(req->status)) return;
+       struct wrepl_connect_state *state = tevent_req_data(req,
+                                           struct wrepl_connect_state);
+       struct wrepl_socket *wrepl_socket = state->caller.wrepl_socket;
+       NTSTATUS status;
 
-       wrepl_socket->fde = event_add_fd(wrepl_socket->event_ctx, wrepl_socket, 
-                                        socket_get_fd(wrepl_socket->sock), 
-                                        0,
-                                        wrepl_handler, wrepl_socket);
-       if (wrepl_socket->fde == NULL) {
-               req->status = NT_STATUS_NO_MEMORY;
+       if (tevent_req_is_nterror(req, &status)) {
+               tevent_req_received(req);
+               return status;
        }
 
+       wrepl_socket->stream = talloc_move(wrepl_socket, &state->stream);
 
-failed:
-       DLIST_REMOVE(wrepl_socket->recv_queue, req);
-       if (!NT_STATUS_IS_OK(req->status)) {
-               req->state = WREPL_REQUEST_ERROR;
-       } else {
-               req->state = WREPL_REQUEST_DONE;
-       }
-       if (req->async.fn) {
-               req->async.fn(req);
-       }
+       tevent_req_received(req);
+       return NT_STATUS_OK;
 }
 
 /*
-  destroy a wrepl_socket destructor
+  connect a wrepl_socket to a WINS server - sync API
 */
-static int wrepl_socket_destructor(void *ptr)
+NTSTATUS wrepl_connect(struct wrepl_socket *wrepl_socket,
+                      const char *our_ip, const char *peer_ip)
 {
-       struct wrepl_socket *sock = talloc_get_type(ptr, struct wrepl_socket);
-       wrepl_socket_dead(sock, NT_STATUS_CONNECTION_DISCONNECTED);
-       return 0;
+       struct tevent_req *subreq;
+       bool ok;
+       NTSTATUS status;
+
+       subreq = wrepl_connect_send(wrepl_socket, wrepl_socket->event.ctx,
+                                   wrepl_socket, our_ip, peer_ip);
+       NT_STATUS_HAVE_NO_MEMORY(subreq);
+
+       ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+       if (!ok) {
+               TALLOC_FREE(subreq);
+               return NT_STATUS_INTERNAL_ERROR;
+       }
+
+       status = wrepl_connect_recv(subreq);
+       TALLOC_FREE(subreq);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return NT_STATUS_OK;
 }
 
-/*
-  initialise a wrepl_socket. The event_ctx is optional, if provided then
-  operations will use that event context
-*/
-struct wrepl_socket *wrepl_socket_init(TALLOC_CTX *mem_ctx, 
-                                      struct event_context *event_ctx)
+struct wrepl_request_state {
+       struct {
+               struct wrepl_socket *wrepl_socket;
+               struct tevent_context *ev;
+       } caller;
+       struct wrepl_send_ctrl ctrl;
+       struct {
+               struct wrepl_wrap wrap;
+               DATA_BLOB blob;
+               struct iovec iov;
+       } req;
+       bool one_way;
+       struct {
+               DATA_BLOB blob;
+               struct wrepl_packet *packet;
+       } rep;
+};
+
+static void wrepl_request_trigger(struct tevent_req *req,
+                                 void *private_data);
+
+struct tevent_req *wrepl_request_send(TALLOC_CTX *mem_ctx,
+                                     struct tevent_context *ev,
+                                     struct wrepl_socket *wrepl_socket,
+                                     const struct wrepl_packet *packet,
+                                     const struct wrepl_send_ctrl *ctrl)
 {
-       struct wrepl_socket *wrepl_socket;
+       struct tevent_req *req;
+       struct wrepl_request_state *state;
        NTSTATUS status;
+       enum ndr_err_code ndr_err;
+       bool ok;
 
-       wrepl_socket = talloc(mem_ctx, struct wrepl_socket);
-       if (wrepl_socket == NULL) goto failed;
-
-       if (event_ctx == NULL) {
-               wrepl_socket->event_ctx = event_context_init(wrepl_socket);
-       } else {
-               wrepl_socket->event_ctx = talloc_reference(wrepl_socket, event_ctx);
+       if (wrepl_socket->event.ctx != ev) {
+               /* TODO: remove wrepl_socket->event.ctx !!! */
+               smb_panic("wrepl_associate_stop_send event context mismatch!");
+               return NULL;
        }
-       if (wrepl_socket->event_ctx == NULL) goto failed;
 
-       status = socket_create("ip", SOCKET_TYPE_STREAM, &wrepl_socket->sock, 0);
-       if (!NT_STATUS_IS_OK(status)) goto failed;
+       req = tevent_req_create(mem_ctx, &state,
+                               struct wrepl_request_state);
+       if (req == NULL) {
+               return NULL;
+       }
 
-       talloc_steal(wrepl_socket, wrepl_socket->sock);
+       state->caller.wrepl_socket = wrepl_socket;
+       state->caller.ev = ev;
 
-       wrepl_socket->send_queue        = NULL;
-       wrepl_socket->recv_queue        = NULL;
-       wrepl_socket->request_timeout   = WREPL_SOCKET_REQUEST_TIMEOUT;
-       wrepl_socket->dead              = False;
+       if (ctrl) {
+               state->ctrl = *ctrl;
+       }
 
-       wrepl_socket->fde = event_add_fd(wrepl_socket->event_ctx, wrepl_socket, 
-                                        socket_get_fd(wrepl_socket->sock), 
-                                        EVENT_FD_WRITE,
-                                        wrepl_connect_handler, wrepl_socket);
-       if (wrepl_socket->fde == NULL) {
-               goto failed;
+       if (wrepl_socket->stream == NULL) {
+               tevent_req_nterror(req, NT_STATUS_INVALID_CONNECTION);
+               return tevent_req_post(req, ev);
        }
 
-       set_blocking(socket_get_fd(wrepl_socket->sock), False);
+       state->req.wrap.packet = *packet;
+       ndr_err = ndr_push_struct_blob(&state->req.blob, state,
+                                      &state->req.wrap,
+                                      (ndr_push_flags_fn_t)ndr_push_wrepl_wrap);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               status = ndr_map_error2ntstatus(ndr_err);
+               tevent_req_nterror(req, status);
+               return tevent_req_post(req, ev);
+       }
 
-       talloc_set_destructor(wrepl_socket, wrepl_socket_destructor);
+       state->req.iov.iov_base = state->req.blob.data;
+       state->req.iov.iov_len = state->req.blob.length;
+
+       ok = tevent_queue_add(wrepl_socket->request_queue,
+                             ev,
+                             req,
+                             wrepl_request_trigger,
+                             NULL);
+       if (!ok) {
+               tevent_req_nomem(NULL, req);
+               return tevent_req_post(req, ev);
+       }
 
-       return wrepl_socket;
+       if (wrepl_socket->request_timeout > 0) {
+               struct timeval endtime;
+               endtime = tevent_timeval_current_ofs(wrepl_socket->request_timeout, 0);
+               ok = tevent_req_set_endtime(req, ev, endtime);
+               if (!ok) {
+                       return tevent_req_post(req, ev);
+               }
+       }
 
-failed:
-       talloc_free(wrepl_socket);
-       return NULL;
+       return req;
 }
 
-/*
-  initialise a wrepl_socket from an already existing connection
-*/
-struct wrepl_socket *wrepl_socket_merge(TALLOC_CTX *mem_ctx, 
-                                       struct event_context *event_ctx,
-                                       struct socket_context *socket)
+static void wrepl_request_writev_done(struct tevent_req *subreq);
+
+static void wrepl_request_trigger(struct tevent_req *req,
+                                 void *private_data)
 {
-       struct wrepl_socket *wrepl_socket;
+       struct wrepl_request_state *state = tevent_req_data(req,
+                                           struct wrepl_request_state);
+       struct tevent_req *subreq;
 
-       wrepl_socket = talloc(mem_ctx, struct wrepl_socket);
-       if (wrepl_socket == NULL) goto failed;
+       if (state->caller.wrepl_socket->stream == NULL) {
+               tevent_req_nterror(req, NT_STATUS_INVALID_CONNECTION);
+               return;
+       }
 
-       wrepl_socket->event_ctx = talloc_reference(wrepl_socket, event_ctx);
-       if (wrepl_socket->event_ctx == NULL) goto failed;
+       if (DEBUGLVL(10)) {
+               DEBUG(10,("Sending WINS packet of length %u\n",
+                         (unsigned)state->req.blob.length));
+               NDR_PRINT_DEBUG(wrepl_packet, &state->req.wrap.packet);
+       }
 
-       wrepl_socket->sock = socket;
-       talloc_steal(wrepl_socket, wrepl_socket->sock);
+       subreq = tstream_writev_send(state,
+                                    state->caller.ev,
+                                    state->caller.wrepl_socket->stream,
+                                    &state->req.iov, 1);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, wrepl_request_writev_done, req);
+}
 
-       wrepl_socket->send_queue        = NULL;
-       wrepl_socket->recv_queue        = NULL;
-       wrepl_socket->request_timeout   = WREPL_SOCKET_REQUEST_TIMEOUT;
-       wrepl_socket->dead              = False;
+static void wrepl_request_disconnect_done(struct tevent_req *subreq);
+static void wrepl_request_read_pdu_done(struct tevent_req *subreq);
 
-       wrepl_socket->fde = event_add_fd(wrepl_socket->event_ctx, wrepl_socket,
-                                        socket_get_fd(wrepl_socket->sock), 
-                                        0,
-                                        wrepl_handler, wrepl_socket);
-       if (wrepl_socket->fde == NULL) {
-               goto failed;
+static void wrepl_request_writev_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_request_state *state = tevent_req_data(req,
+                                           struct wrepl_request_state);
+       int ret;
+       int sys_errno;
+
+       ret = tstream_writev_recv(subreq, &sys_errno);
+       TALLOC_FREE(subreq);
+       if (ret == -1) {
+               NTSTATUS status = map_nt_error_from_unix(sys_errno);
+               TALLOC_FREE(state->caller.wrepl_socket->stream);
+               tevent_req_nterror(req, status);
+               return;
        }
 
-       talloc_set_destructor(wrepl_socket, wrepl_socket_destructor);
-       
-       return wrepl_socket;
+       if (state->caller.wrepl_socket->stream == NULL) {
+               tevent_req_nterror(req, NT_STATUS_INVALID_CONNECTION);
+               return;
+       }
 
-failed:
-       talloc_free(wrepl_socket);
-       return NULL;
-}
+       if (state->ctrl.disconnect_after_send) {
+               subreq = tstream_disconnect_send(state,
+                                                state->caller.ev,
+                                                state->caller.wrepl_socket->stream);
+               if (tevent_req_nomem(subreq, req)) {
+                       return;
+               }
+               tevent_req_set_callback(subreq, wrepl_request_disconnect_done, req);
+               return;
+       }
 
-/*
-  destroy a wrepl_request
-*/
-static int wrepl_request_destructor(void *ptr)
-{
-       struct wrepl_request *req = talloc_get_type(ptr, struct wrepl_request);
-       if (req->state == WREPL_REQUEST_SEND) {
-               DLIST_REMOVE(req->wrepl_socket->send_queue, req);
+       if (state->ctrl.send_only) {
+               tevent_req_done(req);
+               return;
        }
-       if (req->state == WREPL_REQUEST_RECV) {
-               DLIST_REMOVE(req->wrepl_socket->recv_queue, req);
+
+       subreq = tstream_read_pdu_blob_send(state,
+                                           state->caller.ev,
+                                           state->caller.wrepl_socket->stream,
+                                           4, /* initial_read_size */
+                                           packet_full_request_u32,
+                                           NULL);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
        }
-       req->state = WREPL_REQUEST_ERROR;
-       return 0;
+       tevent_req_set_callback(subreq, wrepl_request_read_pdu_done, req);
 }
 
-/*
-  wait for a request to complete
-*/
-static NTSTATUS wrepl_request_wait(struct wrepl_request *req)
+static void wrepl_request_disconnect_done(struct tevent_req *subreq)
 {
-       NT_STATUS_HAVE_NO_MEMORY(req);
-       while (req->state < WREPL_REQUEST_DONE) {
-               event_loop_once(req->wrepl_socket->event_ctx);
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_request_state *state = tevent_req_data(req,
+                                           struct wrepl_request_state);
+       int ret;
+       int sys_errno;
+
+       ret = tstream_disconnect_recv(subreq, &sys_errno);
+       TALLOC_FREE(subreq);
+       if (ret == -1) {
+               NTSTATUS status = map_nt_error_from_unix(sys_errno);
+               TALLOC_FREE(state->caller.wrepl_socket->stream);
+               tevent_req_nterror(req, status);
+               return;
        }
-       return req->status;
-}
 
+       DEBUG(10,("WINS connection disconnected\n"));
+       TALLOC_FREE(state->caller.wrepl_socket->stream);
 
-/*
-  connect a wrepl_socket to a WINS server
-*/
-struct wrepl_request *wrepl_connect_send(struct wrepl_socket *wrepl_socket,
-                                        const char *address)
+       tevent_req_done(req);
+}
+
+static void wrepl_request_read_pdu_done(struct tevent_req *subreq)
 {
-       struct wrepl_request *req;
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_request_state *state = tevent_req_data(req,
+                                           struct wrepl_request_state);
        NTSTATUS status;
+       DATA_BLOB blob;
+       enum ndr_err_code ndr_err;
 
-       req = talloc_zero(wrepl_socket, struct wrepl_request);
-       if (req == NULL) goto failed;
+       status = tstream_read_pdu_blob_recv(subreq, state, &state->rep.blob);
+       if (!NT_STATUS_IS_OK(status)) {
+               TALLOC_FREE(state->caller.wrepl_socket->stream);
+               tevent_req_nterror(req, status);
+               return;
+       }
 
-       req->wrepl_socket = wrepl_socket;
-       req->state        = WREPL_REQUEST_RECV;
+       state->rep.packet = talloc(state, struct wrepl_packet);
+       if (tevent_req_nomem(state->rep.packet, req)) {
+               return;
+       }
 
-       DLIST_ADD(wrepl_socket->recv_queue, req);
+       blob.data = state->rep.blob.data + 4;
+       blob.length = state->rep.blob.length - 4;
 
-       talloc_set_destructor(req, wrepl_request_destructor);
-       
-       status = socket_connect(wrepl_socket->sock, iface_best_ip(address), 0, address, 
-                               WINS_REPLICATION_PORT, 0);
-       if (!NT_STATUS_EQUAL(status, NT_STATUS_MORE_PROCESSING_REQUIRED)) goto failed;
+       /* we have a full request - parse it */
+       ndr_err = ndr_pull_struct_blob(&blob,
+                                      state->rep.packet,
+                                      state->rep.packet,
+                                      (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               status = ndr_map_error2ntstatus(ndr_err);
+               tevent_req_nterror(req, status);
+               return;
+       }
 
-       return req;
+       if (DEBUGLVL(10)) {
+               DEBUG(10,("Received WINS packet of length %u\n",
+                         (unsigned)state->rep.blob.length));
+               NDR_PRINT_DEBUG(wrepl_packet, state->rep.packet);
+       }
 
-failed:
-       talloc_free(req);
-       return NULL;
+       tevent_req_done(req);
 }
 
-/*
-  connect a wrepl_socket to a WINS server - recv side
-*/
-NTSTATUS wrepl_connect_recv(struct wrepl_request *req)
+NTSTATUS wrepl_request_recv(struct tevent_req *req,
+                           TALLOC_CTX *mem_ctx,
+                           struct wrepl_packet **packet)
 {
-       return wrepl_request_wait(req);
-}
-
+       struct wrepl_request_state *state = tevent_req_data(req,
+                                           struct wrepl_request_state);
+       NTSTATUS status;
 
-/*
-  connect a wrepl_socket to a WINS server - sync API
-*/
-NTSTATUS wrepl_connect(struct wrepl_socket *wrepl_socket, const char *address)
-{
-       struct wrepl_request *req = wrepl_connect_send(wrepl_socket, address);
-       return wrepl_connect_recv(req);
-}
+       if (tevent_req_is_nterror(req, &status)) {
+               TALLOC_FREE(state->caller.wrepl_socket->stream);
+               tevent_req_received(req);
+               return status;
+       }
 
-/* 
-   callback from wrepl_request_trigger() 
-*/
-static void wrepl_request_trigger_handler(struct event_context *ev, struct timed_event *te,
-                                         struct timeval t, void *ptr)
-{
-       struct wrepl_request *req = talloc_get_type(ptr, struct wrepl_request);
-       if (req->async.fn) {
-               req->async.fn(req);
+       if (packet) {
+               *packet = talloc_move(mem_ctx, &state->rep.packet);
        }
+
+       tevent_req_received(req);
+       return NT_STATUS_OK;
 }
 
 /*
-  trigger an immediate event on a wrepl_request
+  a full WINS replication request/response
 */
-static void wrepl_request_trigger(struct wrepl_request *req)
+NTSTATUS wrepl_request(struct wrepl_socket *wrepl_socket,
+                      TALLOC_CTX *mem_ctx,
+                      const struct wrepl_packet *req_packet,
+                      struct wrepl_packet **reply_packet)
 {
-       /* a zero timeout means immediate */
-       event_add_timed(req->wrepl_socket->event_ctx,
-                       req, timeval_zero(),
-                       wrepl_request_trigger_handler, req);
+       struct tevent_req *subreq;
+       bool ok;
+       NTSTATUS status;
+
+       subreq = wrepl_request_send(mem_ctx, wrepl_socket->event.ctx,
+                                   wrepl_socket, req_packet, NULL);
+       NT_STATUS_HAVE_NO_MEMORY(subreq);
+
+       ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+       if (!ok) {
+               TALLOC_FREE(subreq);
+               return NT_STATUS_INTERNAL_ERROR;
+       }
+
+       status = wrepl_request_recv(subreq, mem_ctx, reply_packet);
+       TALLOC_FREE(subreq);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return NT_STATUS_OK;
 }
 
 
-/*
-  send a generic wins replication request
-*/
-struct wrepl_request *wrepl_request_send(struct wrepl_socket *wrepl_socket,
-                                        struct wrepl_packet *packet)
-{
-       struct wrepl_request *req;
-       struct wrepl_wrap wrap;
+struct wrepl_associate_state {
+       struct wrepl_packet packet;
+       uint32_t assoc_ctx;
+       uint16_t major_version;
+};
 
-       req = talloc_zero(wrepl_socket, struct wrepl_request);
-       if (req == NULL) goto failed;
+static void wrepl_associate_done(struct tevent_req *subreq);
 
-       if (wrepl_socket->dead) {
-               req->wrepl_socket = wrepl_socket;
-               req->state        = WREPL_REQUEST_ERROR;
-               req->status       = NT_STATUS_INVALID_CONNECTION;
-               wrepl_request_trigger(req);
-               return req;
+struct tevent_req *wrepl_associate_send(TALLOC_CTX *mem_ctx,
+                                       struct tevent_context *ev,
+                                       struct wrepl_socket *wrepl_socket,
+                                       const struct wrepl_associate *io)
+{
+       struct tevent_req *req;
+       struct wrepl_associate_state *state;
+       struct tevent_req *subreq;
+
+       if (wrepl_socket->event.ctx != ev) {
+               /* TODO: remove wrepl_socket->event.ctx !!! */
+               smb_panic("wrepl_associate_send event context mismatch!");
+               return NULL;
        }
 
-       req->wrepl_socket = wrepl_socket;
-       req->state        = WREPL_REQUEST_SEND;
-
-       wrap.packet = *packet;
-       req->status = ndr_push_struct_blob(&req->buffer, req, &wrap,
-                                          (ndr_push_flags_fn_t)ndr_push_wrepl_wrap);
-       if (!NT_STATUS_IS_OK(req->status)) goto failed;
+       req = tevent_req_create(mem_ctx, &state,
+                               struct wrepl_associate_state);
+       if (req == NULL) {
+               return NULL;
+       };
+
+       state->packet.opcode                            = WREPL_OPCODE_BITS;
+       state->packet.mess_type                         = WREPL_START_ASSOCIATION;
+       state->packet.message.start.minor_version       = 2;
+       state->packet.message.start.major_version       = 5;
+
+       /*
+        * nt4 uses 41 bytes for the start_association call
+        * so do it the same and as we don't know th emeanings of this bytes
+        * we just send zeros and nt4, w2k and w2k3 seems to be happy with this
+        *
+        * if we don't do this nt4 uses an old version of the wins replication protocol
+        * and that would break nt4 <-> samba replication
+        */
+       state->packet.padding   = data_blob_talloc(state, NULL, 21);
+       if (tevent_req_nomem(state->packet.padding.data, req)) {
+               return tevent_req_post(req, ev);
+       }
+       memset(state->packet.padding.data, 0, state->packet.padding.length);
 
-       if (DEBUGLVL(10)) {
-               DEBUG(10,("Sending WINS packet of length %d\n", (int)req->buffer.length));
-               NDR_PRINT_DEBUG(wrepl_packet, &wrap.packet);
+       subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
        }
+       tevent_req_set_callback(subreq, wrepl_associate_done, req);
+
+       return req;
+}
 
-       DLIST_ADD(wrepl_socket->send_queue, req);
+static void wrepl_associate_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_associate_state *state = tevent_req_data(req,
+                                             struct wrepl_associate_state);
+       NTSTATUS status;
+       struct wrepl_packet *packet;
 
-       talloc_set_destructor(req, wrepl_request_destructor);
+       status = wrepl_request_recv(subreq, state, &packet);
+       TALLOC_FREE(subreq);
+       if (!NT_STATUS_IS_OK(status)) {
+               tevent_req_nterror(req, status);
+               return;
+       }
 
-       if (wrepl_socket->request_timeout > 0) {
-               req->te = event_add_timed(wrepl_socket->event_ctx, req, 
-                                         timeval_current_ofs(wrepl_socket->request_timeout, 0), 
-                                         wrepl_request_timeout_handler, req);
+       if (packet->mess_type != WREPL_START_ASSOCIATION_REPLY) {
+               tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
+               return;
        }
 
-       EVENT_FD_WRITEABLE(wrepl_socket->fde);
-       
-       return req;
+       state->assoc_ctx = packet->message.start_reply.assoc_ctx;
+       state->major_version = packet->message.start_reply.major_version;
 
-failed:
-       talloc_free(req);
-       return NULL;
+       tevent_req_done(req);
 }
 
 /*
-  receive a generic WINS replication reply
+  setup an association - recv
 */
-NTSTATUS wrepl_request_recv(struct wrepl_request *req,
-                           TALLOC_CTX *mem_ctx,
-                           struct wrepl_packet **packet)
+NTSTATUS wrepl_associate_recv(struct tevent_req *req,
+                             struct wrepl_associate *io)
 {
-       NTSTATUS status = wrepl_request_wait(req);
-       if (NT_STATUS_IS_OK(status)) {
-               *packet = talloc_steal(mem_ctx, req->packet);
+       struct wrepl_associate_state *state = tevent_req_data(req,
+                                             struct wrepl_associate_state);
+       NTSTATUS status;
+
+       if (tevent_req_is_nterror(req, &status)) {
+               tevent_req_received(req);
+               return status;
        }
-       talloc_free(req);
-       return status;
+
+       io->out.assoc_ctx = state->assoc_ctx;
+       io->out.major_version = state->major_version;
+
+       tevent_req_received(req);
+       return NT_STATUS_OK;
 }
 
 /*
-  a full WINS replication request/response
+  setup an association - sync api
 */
-NTSTATUS wrepl_request(struct wrepl_socket *wrepl_socket,
-                      TALLOC_CTX *mem_ctx,
-                      struct wrepl_packet *req_packet,
-                      struct wrepl_packet **reply_packet)
+NTSTATUS wrepl_associate(struct wrepl_socket *wrepl_socket,
+                        struct wrepl_associate *io)
 {
-       struct wrepl_request *req = wrepl_request_send(wrepl_socket, req_packet);
-       return wrepl_request_recv(req, mem_ctx, reply_packet);
+       struct tevent_req *subreq;
+       bool ok;
+       NTSTATUS status;
+
+       subreq = wrepl_associate_send(wrepl_socket, wrepl_socket->event.ctx,
+                                     wrepl_socket, io);
+       NT_STATUS_HAVE_NO_MEMORY(subreq);
+
+       ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+       if (!ok) {
+               TALLOC_FREE(subreq);
+               return NT_STATUS_INTERNAL_ERROR;
+       }
+
+       status = wrepl_associate_recv(subreq, io);
+       TALLOC_FREE(subreq);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return NT_STATUS_OK;
 }
 
+struct wrepl_associate_stop_state {
+       struct wrepl_packet packet;
+       struct wrepl_send_ctrl ctrl;
+};
 
-/*
-  setup an association - send
-*/
-struct wrepl_request *wrepl_associate_send(struct wrepl_socket *wrepl_socket,
-                                          struct wrepl_associate *io)
+static void wrepl_associate_stop_done(struct tevent_req *subreq);
+
+struct tevent_req *wrepl_associate_stop_send(TALLOC_CTX *mem_ctx,
+                                            struct tevent_context *ev,
+                                            struct wrepl_socket *wrepl_socket,
+                                            const struct wrepl_associate_stop *io)
 {
-       struct wrepl_packet *packet;
-       struct wrepl_request *req;
+       struct tevent_req *req;
+       struct wrepl_associate_stop_state *state;
+       struct tevent_req *subreq;
+
+       if (wrepl_socket->event.ctx != ev) {
+               /* TODO: remove wrepl_socket->event.ctx !!! */
+               smb_panic("wrepl_associate_stop_send event context mismatch!");
+               return NULL;
+       }
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct wrepl_associate_stop_state);
+       if (req == NULL) {
+               return NULL;
+       };
 
-       packet = talloc_zero(wrepl_socket, struct wrepl_packet);
-       if (packet == NULL) return NULL;
+       state->packet.opcode                    = WREPL_OPCODE_BITS;
+       state->packet.assoc_ctx                 = io->in.assoc_ctx;
+       state->packet.mess_type                 = WREPL_STOP_ASSOCIATION;
+       state->packet.message.stop.reason       = io->in.reason;
 
-       packet->opcode                      = WREPL_OPCODE_BITS;
-       packet->mess_type                   = WREPL_START_ASSOCIATION;
-       packet->message.start.minor_version = 2;
-       packet->message.start.major_version = 5;
+       if (io->in.reason == 0) {
+               state->ctrl.send_only                   = true;
+               state->ctrl.disconnect_after_send       = true;
+       }
+
+       subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, &state->ctrl);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, wrepl_associate_stop_done, req);
+
+       return req;
+}
 
-       req = wrepl_request_send(wrepl_socket, packet);
+static void wrepl_associate_stop_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_associate_stop_state *state = tevent_req_data(req,
+                                                  struct wrepl_associate_stop_state);
+       NTSTATUS status;
 
-       talloc_free(packet);
+       /* currently we don't care about a possible response */
+       status = wrepl_request_recv(subreq, state, NULL);
+       TALLOC_FREE(subreq);
+       if (!NT_STATUS_IS_OK(status)) {
+               tevent_req_nterror(req, status);
+               return;
+       }
 
-       return req;     
+       tevent_req_done(req);
 }
 
 /*
-  setup an association - recv
+  stop an association - recv
 */
-NTSTATUS wrepl_associate_recv(struct wrepl_request *req,
-                             struct wrepl_associate *io)
+NTSTATUS wrepl_associate_stop_recv(struct tevent_req *req,
+                                  struct wrepl_associate_stop *io)
 {
-       struct wrepl_packet *packet=NULL;
        NTSTATUS status;
-       status = wrepl_request_recv(req, req->wrepl_socket, &packet);
-       NT_STATUS_NOT_OK_RETURN(status);
-       if (packet->mess_type != WREPL_START_ASSOCIATION_REPLY) {
-               status = NT_STATUS_UNEXPECTED_NETWORK_ERROR;
-       }
-       if (NT_STATUS_IS_OK(status)) {
-               io->out.assoc_ctx = packet->message.start_reply.assoc_ctx;
+
+       if (tevent_req_is_nterror(req, &status)) {
+               tevent_req_received(req);
+               return status;
        }
-       talloc_free(packet);
-       return status;
+
+       tevent_req_received(req);
+       return NT_STATUS_OK;
 }
 
 /*
   setup an association - sync api
 */
-NTSTATUS wrepl_associate(struct wrepl_socket *wrepl_socket,
-                        struct wrepl_associate *io)
+NTSTATUS wrepl_associate_stop(struct wrepl_socket *wrepl_socket,
+                             struct wrepl_associate_stop *io)
 {
-       struct wrepl_request *req = wrepl_associate_send(wrepl_socket, io);
-       return wrepl_associate_recv(req, io);
+       struct tevent_req *subreq;
+       bool ok;
+       NTSTATUS status;
+
+       subreq = wrepl_associate_stop_send(wrepl_socket, wrepl_socket->event.ctx,
+                                          wrepl_socket, io);
+       NT_STATUS_HAVE_NO_MEMORY(subreq);
+
+       ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+       if (!ok) {
+               TALLOC_FREE(subreq);
+               return NT_STATUS_INTERNAL_ERROR;
+       }
+
+       status = wrepl_associate_stop_recv(subreq, io);
+       TALLOC_FREE(subreq);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return NT_STATUS_OK;
 }
 
+struct wrepl_pull_table_state {
+       struct wrepl_packet packet;
+       uint32_t num_partners;
+       struct wrepl_wins_owner *partners;
+};
 
-/*
-  fetch the partner tables - send
-*/
-struct wrepl_request *wrepl_pull_table_send(struct wrepl_socket *wrepl_socket,
-                                           struct wrepl_pull_table *io)
+static void wrepl_pull_table_done(struct tevent_req *subreq);
+
+struct tevent_req *wrepl_pull_table_send(TALLOC_CTX *mem_ctx,
+                                        struct tevent_context *ev,
+                                        struct wrepl_socket *wrepl_socket,
+                                        const struct wrepl_pull_table *io)
+{
+       struct tevent_req *req;
+       struct wrepl_pull_table_state *state;
+       struct tevent_req *subreq;
+
+       if (wrepl_socket->event.ctx != ev) {
+               /* TODO: remove wrepl_socket->event.ctx !!! */
+               smb_panic("wrepl_pull_table_send event context mismatch!");
+               return NULL;
+       }
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct wrepl_pull_table_state);
+       if (req == NULL) {
+               return NULL;
+       };
+
+       state->packet.opcode                            = WREPL_OPCODE_BITS;
+       state->packet.assoc_ctx                         = io->in.assoc_ctx;
+       state->packet.mess_type                         = WREPL_REPLICATION;
+       state->packet.message.replication.command       = WREPL_REPL_TABLE_QUERY;
+
+       subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, wrepl_pull_table_done, req);
+
+       return req;
+}
+
+static void wrepl_pull_table_done(struct tevent_req *subreq)
 {
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_pull_table_state *state = tevent_req_data(req,
+                                              struct wrepl_pull_table_state);
+       NTSTATUS status;
        struct wrepl_packet *packet;
-       struct wrepl_request *req;
+       struct wrepl_table *table;
+
+       status = wrepl_request_recv(subreq, state, &packet);
+       TALLOC_FREE(subreq);
+       if (!NT_STATUS_IS_OK(status)) {
+               tevent_req_nterror(req, status);
+               return;
+       }
 
-       packet = talloc_zero(wrepl_socket, struct wrepl_packet);
-       if (packet == NULL) return NULL;
+       if (packet->mess_type != WREPL_REPLICATION) {
+               tevent_req_nterror(req, NT_STATUS_NETWORK_ACCESS_DENIED);
+               return;
+       }
 
-       packet->opcode                      = WREPL_OPCODE_BITS;
-       packet->assoc_ctx                   = io->in.assoc_ctx;
-       packet->mess_type                   = WREPL_REPLICATION;
-       packet->message.replication.command = WREPL_REPL_TABLE_QUERY;
+       if (packet->message.replication.command != WREPL_REPL_TABLE_REPLY) {
+               tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
+               return;
+       }
 
-       req = wrepl_request_send(wrepl_socket, packet);
+       table = &packet->message.replication.info.table;
 
-       talloc_free(packet);
+       state->num_partners = table->partner_count;
+       state->partners = talloc_move(state, &table->partners);
 
-       return req;     
+       tevent_req_done(req);
 }
 
-
 /*
   fetch the partner tables - recv
 */
-NTSTATUS wrepl_pull_table_recv(struct wrepl_request *req,
+NTSTATUS wrepl_pull_table_recv(struct tevent_req *req,
                               TALLOC_CTX *mem_ctx,
                               struct wrepl_pull_table *io)
 {
-       struct wrepl_packet *packet=NULL;
+       struct wrepl_pull_table_state *state = tevent_req_data(req,
+                                              struct wrepl_pull_table_state);
        NTSTATUS status;
-       struct wrepl_table *table;
-       int i;
 
-       status = wrepl_request_recv(req, req->wrepl_socket, &packet);
-       NT_STATUS_NOT_OK_RETURN(status);
-       if (packet->mess_type != WREPL_REPLICATION) {
-               status = NT_STATUS_NETWORK_ACCESS_DENIED;
-       } else if (packet->message.replication.command != WREPL_REPL_TABLE_REPLY) {
-               status = NT_STATUS_UNEXPECTED_NETWORK_ERROR;
+       if (tevent_req_is_nterror(req, &status)) {
+               tevent_req_received(req);
+               return status;
        }
-       if (!NT_STATUS_IS_OK(status)) goto failed;
 
-       table = &packet->message.replication.info.table;
-       io->out.num_partners = table->partner_count;
-       io->out.partners = talloc_steal(mem_ctx, table->partners);
-       for (i=0;i<io->out.num_partners;i++) {
-               talloc_steal(io->out.partners, io->out.partners[i].address);
-       }
+       io->out.num_partners = state->num_partners;
+       io->out.partners = talloc_move(mem_ctx, &state->partners);
 
-failed:
-       talloc_free(packet);
-       return status;
+       tevent_req_received(req);
+       return NT_STATUS_OK;
 }
 
-
 /*
   fetch the partner table - sync api
 */
@@ -684,114 +954,191 @@ NTSTATUS wrepl_pull_table(struct wrepl_socket *wrepl_socket,
                          TALLOC_CTX *mem_ctx,
                          struct wrepl_pull_table *io)
 {
-       struct wrepl_request *req = wrepl_pull_table_send(wrepl_socket, io);
-       return wrepl_pull_table_recv(req, mem_ctx, io);
-}
+       struct tevent_req *subreq;
+       bool ok;
+       NTSTATUS status;
 
+       subreq = wrepl_pull_table_send(mem_ctx, wrepl_socket->event.ctx,
+                                      wrepl_socket, io);
+       NT_STATUS_HAVE_NO_MEMORY(subreq);
+
+       ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+       if (!ok) {
+               TALLOC_FREE(subreq);
+               return NT_STATUS_INTERNAL_ERROR;
+       }
+
+       status = wrepl_pull_table_recv(subreq, mem_ctx, io);
+       TALLOC_FREE(subreq);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return NT_STATUS_OK;
+}
 
-/*
-  fetch the names for a WINS partner - send
-*/
-struct wrepl_request *wrepl_pull_names_send(struct wrepl_socket *wrepl_socket,
-                                           struct wrepl_pull_names *io)
-{
-       struct wrepl_packet *packet;
-       struct wrepl_request *req;
 
-       packet = talloc_zero(wrepl_socket, struct wrepl_packet);
-       if (packet == NULL) return NULL;
+struct wrepl_pull_names_state {
+       struct {
+               const struct wrepl_pull_names *io;
+       } caller;
+       struct wrepl_packet packet;
+       uint32_t num_names;
+       struct wrepl_name *names;
+};
 
-       packet->opcode                         = WREPL_OPCODE_BITS;
-       packet->assoc_ctx                      = io->in.assoc_ctx;
-       packet->mess_type                      = WREPL_REPLICATION;
-       packet->message.replication.command    = WREPL_REPL_SEND_REQUEST;
-       packet->message.replication.info.owner = io->in.partner;
+static void wrepl_pull_names_done(struct tevent_req *subreq);
 
-       req = wrepl_request_send(wrepl_socket, packet);
+struct tevent_req *wrepl_pull_names_send(TALLOC_CTX *mem_ctx,
+                                        struct tevent_context *ev,
+                                        struct wrepl_socket *wrepl_socket,
+                                        const struct wrepl_pull_names *io)
+{
+       struct tevent_req *req;
+       struct wrepl_pull_names_state *state;
+       struct tevent_req *subreq;
+
+       if (wrepl_socket->event.ctx != ev) {
+               /* TODO: remove wrepl_socket->event.ctx !!! */
+               smb_panic("wrepl_pull_names_send event context mismatch!");
+               return NULL;
+       }
 
-       talloc_free(packet);
+       req = tevent_req_create(mem_ctx, &state,
+                               struct wrepl_pull_names_state);
+       if (req == NULL) {
+               return NULL;
+       };
+       state->caller.io = io;
+
+       state->packet.opcode                            = WREPL_OPCODE_BITS;
+       state->packet.assoc_ctx                         = io->in.assoc_ctx;
+       state->packet.mess_type                         = WREPL_REPLICATION;
+       state->packet.message.replication.command       = WREPL_REPL_SEND_REQUEST;
+       state->packet.message.replication.info.owner    = io->in.partner;
+
+       subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, wrepl_pull_names_done, req);
 
-       return req;     
+       return req;
 }
 
-/*
-  fetch the names for a WINS partner - recv
-*/
-NTSTATUS wrepl_pull_names_recv(struct wrepl_request *req,
-                              TALLOC_CTX *mem_ctx,
-                              struct wrepl_pull_names *io)
+static void wrepl_pull_names_done(struct tevent_req *subreq)
 {
-       struct wrepl_packet *packet=NULL;
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       struct wrepl_pull_names_state *state = tevent_req_data(req,
+                                              struct wrepl_pull_names_state);
        NTSTATUS status;
-       int i;
+       struct wrepl_packet *packet;
+       uint32_t i;
 
-       status = wrepl_request_recv(req, req->wrepl_socket, &packet);
-       NT_STATUS_NOT_OK_RETURN(status);
-       if (packet->mess_type != WREPL_REPLICATION ||
-           packet->message.replication.command != WREPL_REPL_SEND_REPLY) {
-               status = NT_STATUS_UNEXPECTED_NETWORK_ERROR;
+       status = wrepl_request_recv(subreq, state, &packet);
+       TALLOC_FREE(subreq);
+       if (!NT_STATUS_IS_OK(status)) {
+               tevent_req_nterror(req, status);
+               return;
+       }
+
+       if (packet->mess_type != WREPL_REPLICATION) {
+               tevent_req_nterror(req, NT_STATUS_NETWORK_ACCESS_DENIED);
+               return;
        }
-       if (!NT_STATUS_IS_OK(status)) goto failed;
 
-       io->out.num_names = packet->message.replication.info.reply.num_names;
+       if (packet->message.replication.command != WREPL_REPL_SEND_REPLY) {
+               tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
+               return;
+       }
 
-       status = NT_STATUS_NO_MEMORY;
+       state->num_names = packet->message.replication.info.reply.num_names;
 
-       io->out.names = talloc_array(packet, struct wrepl_name, io->out.num_names);
-       if (io->out.names == NULL) goto nomem;
+       state->names = talloc_array(state, struct wrepl_name, state->num_names);
+       if (tevent_req_nomem(state->names, req)) {
+               return;
+       }
 
        /* convert the list of names and addresses to a sane format */
-       for (i=0;i<io->out.num_names;i++) {
+       for (i=0; i < state->num_names; i++) {
                struct wrepl_wins_name *wname = &packet->message.replication.info.reply.names[i];
-               struct wrepl_name *name = &io->out.names[i];
+               struct wrepl_name *name = &state->names[i];
 
                name->name      = *wname->name;
-               talloc_steal(io->out.names, wname->name);
+               talloc_steal(state->names, wname->name);
                name->type      = WREPL_NAME_TYPE(wname->flags);
                name->state     = WREPL_NAME_STATE(wname->flags);
                name->node      = WREPL_NAME_NODE(wname->flags);
                name->is_static = WREPL_NAME_IS_STATIC(wname->flags);
                name->raw_flags = wname->flags;
                name->version_id= wname->id;
-               name->owner     = talloc_strdup(io->out.names, io->in.partner.address);
-               if (name->owner == NULL) goto nomem;
+               name->owner     = talloc_strdup(state->names,
+                                               state->caller.io->in.partner.address);
+               if (tevent_req_nomem(name->owner, req)) {
+                       return;
+               }
 
                /* trying to save 1 or 2 bytes on the wire isn't a good idea */
                if (wname->flags & 2) {
-                       int j;
+                       uint32_t j;
 
                        name->num_addresses = wname->addresses.addresses.num_ips;
-                       name->addresses = talloc_array(io->out.names, 
-                                                      struct wrepl_address, 
+                       name->addresses = talloc_array(state->names,
+                                                      struct wrepl_address,
                                                       name->num_addresses);
-                       if (name->addresses == NULL) goto nomem;
+                       if (tevent_req_nomem(name->addresses, req)) {
+                               return;
+                       }
+
                        for (j=0;j<name->num_addresses;j++) {
-                               name->addresses[j].owner = 
-                                       talloc_steal(name->addresses, 
-                                                    wname->addresses.addresses.ips[j].owner);
+                               name->addresses[j].owner =
+                                       talloc_move(name->addresses,
+                                                   &wname->addresses.addresses.ips[j].owner);
                                name->addresses[j].address = 
-                                       talloc_steal(name->addresses, 
-                                                    wname->addresses.addresses.ips[j].ip);
+                                       talloc_move(name->addresses,
+                                                   &wname->addresses.addresses.ips[j].ip);
                        }
                } else {
                        name->num_addresses = 1;
-                       name->addresses = talloc(io->out.names, struct wrepl_address);
-                       if (name->addresses == NULL) goto nomem;
-                       name->addresses[0].owner = talloc_strdup(name->addresses,io->in.partner.address);
-                       if (name->addresses[0].owner == NULL) goto nomem;
-                       name->addresses[0].address = talloc_steal(name->addresses,
-                                                                 wname->addresses.ip);
+                       name->addresses = talloc_array(state->names,
+                                                      struct wrepl_address,
+                                                      name->num_addresses);
+                       if (tevent_req_nomem(name->addresses, req)) {
+                               return;
+                       }
+
+                       name->addresses[0].owner = talloc_strdup(name->addresses, name->owner);
+                       if (tevent_req_nomem(name->addresses[0].owner, req)) {
+                               return;
+                       }
+                       name->addresses[0].address = talloc_move(name->addresses,
+                                                                &wname->addresses.ip);
                }
        }
 
-       talloc_steal(mem_ctx, io->out.names);
-       talloc_free(packet);
+       tevent_req_done(req);
+}
+
+/*
+  fetch the names for a WINS partner - recv
+*/
+NTSTATUS wrepl_pull_names_recv(struct tevent_req *req,
+                              TALLOC_CTX *mem_ctx,
+                              struct wrepl_pull_names *io)
+{
+       struct wrepl_pull_names_state *state = tevent_req_data(req,
+                                              struct wrepl_pull_names_state);
+       NTSTATUS status;
+
+       if (tevent_req_is_nterror(req, &status)) {
+               tevent_req_received(req);
+               return status;
+       }
+
+       io->out.num_names = state->num_names;
+       io->out.names = talloc_move(mem_ctx, &state->names);
+
+       tevent_req_received(req);
        return NT_STATUS_OK;
-nomem:
-       status = NT_STATUS_NO_MEMORY;
-failed:
-       talloc_free(packet);
-       return status;
 }
 
 
@@ -803,6 +1150,23 @@ NTSTATUS wrepl_pull_names(struct wrepl_socket *wrepl_socket,
                          TALLOC_CTX *mem_ctx,
                          struct wrepl_pull_names *io)
 {
-       struct wrepl_request *req = wrepl_pull_names_send(wrepl_socket, io);
-       return wrepl_pull_names_recv(req, mem_ctx, io);
+       struct tevent_req *subreq;
+       bool ok;
+       NTSTATUS status;
+
+       subreq = wrepl_pull_names_send(mem_ctx, wrepl_socket->event.ctx,
+                                      wrepl_socket, io);
+       NT_STATUS_HAVE_NO_MEMORY(subreq);
+
+       ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+       if (!ok) {
+               TALLOC_FREE(subreq);
+               return NT_STATUS_INTERNAL_ERROR;
+       }
+
+       status = wrepl_pull_names_recv(subreq, mem_ctx, io);
+       TALLOC_FREE(subreq);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return NT_STATUS_OK;
 }