r11048: r10539@SERNOX: metze | 2005-09-27 14:59:47 +0200
authorStefan Metzmacher <metze@samba.org>
Fri, 14 Oct 2005 13:02:00 +0000 (13:02 +0000)
committerGerald (Jerry) Carter <jerry@samba.org>
Wed, 10 Oct 2007 18:44:42 +0000 (13:44 -0500)
 fix the build for changes from SAMBA_4_0 branch

 metze
 r10541@SERNOX:  metze | 2005-09-27 15:05:33 +0200
 use a transaction when we allocate a new version

 metze
 r10549@SERNOX:  metze | 2005-09-27 18:58:37 +0200
 - add first start of wins pull replication
 - we not yet apply records to our database but we fetch them correct form our partners
   (we need conflict handling for this)
 - we also need to filter out our own records!

 metze
 r10568@SERNOX:  metze | 2005-09-28 11:33:04 +0200
 move composite helpers to a seperate file, create a a seperate file for the conflict resolving logic

 metze
 r10571@SERNOX:  metze | 2005-09-28 12:00:17 +0200
 add forward declarations...to fix the build

 metze
 r10612@SERNOX:  metze | 2005-09-29 16:11:06 +0200
 we have a nbt_name now, and don't need to parse it

 metze
 r10614@SERNOX:  metze | 2005-09-29 16:38:35 +0200
 filter out our own records

 metze
 r10620@SERNOX:  metze | 2005-09-29 18:07:08 +0200
 - handle mutliple addresses in WREPL_REPL_SEND_REPLY
 - make strings always valid talloc pointers

 metze
 r10621@SERNOX:  metze | 2005-09-29 18:09:41 +0200
 use debug level 2

 metze
 r10622@SERNOX:  metze | 2005-09-29 18:48:05 +0200
 - add one more debug message when we reply no record
 - fix min max logic

 metze
 r10623@SERNOX:  metze | 2005-09-29 20:49:06 +0200
 build fixes...

 metze
 r10629@SERNOX:  metze | 2005-09-30 00:11:41 +0200
 - use seperate attributes for type, state, nodetype, is_static

 ... the winserver.c code needs some more updates to correctly,
 create special group and multihomed registrations...

 metze
 r10640@SERNOX:  metze | 2005-09-30 04:07:34 +0200
 - add some short path for the composite helper functions
   they will be used in the next commit

 metze
 r10642@SERNOX:  metze | 2005-09-30 06:29:06 +0200
 fix the build

 metze
 r10655@SERNOX:  metze | 2005-09-30 17:36:49 +0200
 - implement the WREPL_REPL_UPDATE* and WREPL_REPL_INFORM*
   this includes the connection fliping into a client connection
   for WREPL_REPL_UPDATE*

 NOTE: I not yet found out how to get the w2k server to use INFORM against samba4
       it uses inform against w2k and w2k3 but UPDATE against nt4 and samba4

 what's left now is to be able to initiate INFORM and UPDATE requests to notify
 our pull partners

 metze
 r10727@SERNOX:  metze | 2005-10-05 14:11:05 +0200
 fix the build

 metze
 r10770@SERNOX:  metze | 2005-10-06 16:56:01 +0200
 - move the table filling to a seperate function, will be reused later
 - fix the build, wrepl_nbt_name fixes
 - remove state -> update_state

 metze
 r10771@SERNOX:  metze | 2005-10-06 17:04:48 +0200
 add a function to create a wreplsrv_in_connection from a client connection

 metze
 r10772@SERNOX:  metze | 2005-10-06 17:13:51 +0200
 - make the connection code more generic to handle the pull cached connection,
   push cached connection or given connections
 - when we don't use a cached connection, disconnection when a pull_cycle is done
 - fix the build and use the configured source ip

 metze
 r10773@SERNOX:  metze | 2005-10-06 17:18:49 +0200
 - add composite functions for push notification

 metze
 r10774@SERNOX:  metze | 2005-10-06 17:23:46 +0200
 - use periodic push notifycation, this is just for now
   as it needs to be configurable and and be triggered when the local database
   has changes since the last notify
 - I also need to work out how to decide if the partner supports
   persistent connections and WREPL_REPL_INFORM* messages

 metze
 r10923@SERNOX:  metze | 2005-10-12 16:52:34 +0200
 fix the build becuse of conflicts with main SAMBA_4_0 tree

 metze
(This used to be commit 6d97dd6e50423758d081459ec551f4e04dfd818d)

source4/include/structs.h
source4/nbt_server/defense.c
source4/nbt_server/irpc.c
source4/nbt_server/nbt_server.h
source4/nbt_server/wins/winsdb.c
source4/nbt_server/wins/winsdb.h
source4/nbt_server/wins/winsserver.c
source4/wrepl_server/wrepl_apply_records.c [new file with mode: 0644]
source4/wrepl_server/wrepl_out_helpers.c [new file with mode: 0644]
source4/wrepl_server/wrepl_out_helpers.h [new file with mode: 0644]

index bcbf82e63b438e16169c02633c7457b292d25bbc..5fb69df7f6019dd03befdaba79999e8f20dd0487 100644 (file)
@@ -285,6 +285,7 @@ struct wrepl_associate;
 struct wrepl_associate_stop;
 struct wrepl_pull_table;
 struct wrepl_pull_names;
+struct wrepl_table;
 
 struct arcfour_state;
 
index f8c73c6a3a6505b157f1d69dbdea4533a93bb694..55a345e18dbdbfaa403cf606b3ce981acbae7380 100644 (file)
@@ -57,7 +57,7 @@ void nbtd_request_defense(struct nbt_name_socket *nbtsock,
 
        iname = nbtd_find_iname(iface, name, NBT_NM_ACTIVE);
        if (iname != NULL && 
-           !IS_GROUP_NAME(name, iname->nb_flags)) {
+           !(name->type == NBT_NAME_LOGON || iname->nb_flags & NBT_NM_GROUP)) {
                DEBUG(2,("Defending name %s on %s against %s\n",
                         nbt_name_string(packet, name), 
                         iface->bcast_address, src->addr));
index 94284184c15a1955d859c8b0c1488bbdd02c8673..14a274da5e0f04aa7c44064fe2eba95aac7838d4 100644 (file)
@@ -120,6 +120,7 @@ static NTSTATUS nbtd_getdcname(struct irpc_message *msg,
        struct nbt_ntlogon_sam_logon *r;
        struct nbt_dgram_socket *sock;
        struct nbt_name src, dst;
+       struct nbt_peer_socket dest;
        struct dgram_mailslot_handler *handler;
        NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
 
@@ -153,8 +154,10 @@ static NTSTATUS nbtd_getdcname(struct irpc_message *msg,
        make_nbt_name_client(&src, req->in.my_computername);
        make_nbt_name(&dst, req->in.domainname, 0x1c);
 
+       dest.addr = req->in.ip_address;
+       dest.port = 138;
        status = dgram_mailslot_ntlogon_send(sock, DGRAM_DIRECT_GROUP,
-                                            &dst, req->in.ip_address, 138,
+                                            &dst, &dest,
                                             &src, &p);
        if (!NT_STATUS_IS_OK(status)) {
                DEBUG(0, ("dgram_mailslot_ntlogon_send failed: %s\n",
index b9ed265b8811c1cf15dfae72965b3de5f6c7073e..71e384bd2cd8d8d875ce39e62a6498512f44f745 100644 (file)
@@ -21,6 +21,7 @@
 */
 
 #include "libcli/nbt/libnbt.h"
+#include "libcli/wrepl/winsrepl.h"
 #include "libcli/dgram/libdgram.h"
 #include "librpc/gen_ndr/ndr_irpc.h"
 #include "lib/messaging/irpc.h"
@@ -79,13 +80,9 @@ struct nbtd_server {
 
 
 /* check a condition on an incoming packet */
-#define NBTD_ASSERT_PACKET(packet, src_address, test) do { \
+#define NBTD_ASSERT_PACKET(packet, src, test) do { \
        if (!(test)) { \
-               nbtd_bad_packet(packet, src_address, #test); \
+               nbtd_bad_packet(packet, src, #test); \
                return; \
        } \
 } while (0)
-
-/* this copes with the nasty hack that is the type 0x1c name */
-#define IS_GROUP_NAME(name, nb_flags) \
-       ((name)->type != NBT_NAME_LOGON && (nb_flags & NBT_NM_GROUP))
index 49ffb13797f0625f8a9ab23b6505a3fe319a148e..50baa5089812cc041cc96bcb8f23c5ee1b73059e 100644 (file)
@@ -44,10 +44,8 @@ static uint64_t winsdb_allocate_version(struct wins_server *winssrv)
        dn = ldb_dn_explode(tmp_ctx, "CN=VERSION");
        if (!dn) goto failed;
 
-       ret |= ldb_msg_add_string(msg, "objectClass", "winsEntry");
-       ret |= ldb_msg_add_fmt(msg, "minVersion", "%llu", winssrv->min_version);
-       ret |= ldb_msg_add_fmt(msg, "maxVersion", "%llu", winssrv->max_version);
-       if (ret != 0) goto failed;
+       dn = ldb_dn_explode(tmp_ctx, "CN=VERSION");
+       if (!dn) goto failed;
 
        if (ret == 1) {
                maxVersion = ldb_msg_find_uint64(res[0], "maxVersion", 0);
@@ -71,6 +69,9 @@ static uint64_t winsdb_allocate_version(struct wins_server *winssrv)
        talloc_free(tmp_ctx);
        return maxVersion;
 
+       talloc_free(tmp_ctx);
+       return maxVersion;
+
 failed:
        talloc_free(tmp_ctx);
        return 0;
@@ -742,16 +743,19 @@ struct ldb_message *winsdb_message(struct ldb_context *ldb,
 
        msg->dn = winsdb_dn(msg, rec->name);
        if (msg->dn == NULL) goto failed;
-       ret |= ldb_msg_add_fmt(msg, "objectClass", "wins");
-       ret |= ldb_msg_add_fmt(msg, "active", "%u", rec->state);
-       ret |= ldb_msg_add_fmt(msg, "nbFlags", "0x%04x", rec->nb_flags);
-       ret |= ldb_msg_add_string(msg, "registeredBy", rec->registered_by);
-       ret |= ldb_msg_add_string(msg, "expires", 
+       ret |= ldb_msg_add_fmt(msg, "objectClass", "winsRecord");
+       ret |= ldb_msg_add_fmt(msg, "recordType", "%u", rec->type);
+       ret |= ldb_msg_add_fmt(msg, "recordState", "%u", rec->state);
+       ret |= ldb_msg_add_fmt(msg, "nodeType", "%u", rec->node);
+       ret |= ldb_msg_add_fmt(msg, "isStatic", "%u", rec->is_static);
+       ret |= ldb_msg_add_string(msg, "expireTime", 
                                  ldb_timestring(msg, rec->expire_time));
-       ret |= ldb_msg_add_fmt(msg, "version", "%llu", rec->version);
+       ret |= ldb_msg_add_fmt(msg, "versionID", "%llu", rec->version);
+       ret |= ldb_msg_add_string(msg, "winsOwner", rec->wins_owner);
        for (i=0;rec->addresses[i];i++) {
-               ret |= ldb_msg_add_string(msg, "address", rec->addresses[i]);
+               ret |= ldb_msg_add_winsdb_addr(msg, "address", rec->addresses[i]);
        }
+       ret |= ldb_msg_add_string(msg, "registeredBy", rec->registered_by);
        if (ret != 0) goto failed;
        return msg;
 
@@ -771,12 +775,12 @@ uint8_t winsdb_add(struct wins_server *winssrv, struct winsdb_record *rec)
        int trans = -1;
        int ret = 0;
 
-
        trans = ldb_transaction_start(ldb);
        if (trans != LDB_SUCCESS) goto failed;
 
        rec->version = winsdb_allocate_version(winssrv);
        if (rec->version == 0) goto failed;
+       rec->wins_owner = WINSDB_OWNER_LOCAL;
 
        msg = winsdb_message(winssrv->wins_db, rec, tmp_ctx);
        if (msg == NULL) goto failed;
@@ -849,9 +853,6 @@ uint8_t winsdb_delete(struct wins_server *winssrv, struct winsdb_record *rec)
        int trans;
        int ret;
 
-       if(!winsdb_remove_version(winssrv, rec->version))
-               goto failed;
-
        dn = winsdb_dn(tmp_ctx, rec->name);
        if (dn == NULL) goto failed;
 
index c775159a186306fdad9b9a25d16770d525d548bf..2ac1884063ac1647f17d5f42df75ed331e0ff054 100644 (file)
    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
 */
 
-enum wins_record_state {
-       WINS_REC_RELEASED =0,
-       WINS_REC_ACTIVE   =1
+#define WINSDB_OWNER_LOCAL     "0.0.0.0"
+#define WINSDB_GROUP_ADDRESS   "255.255.255.255"
+
+struct winsdb_addr {
+       const char *address;
+       const char *wins_owner;
+       time_t expire_time;
 };
 
 #define WINSDB_OWNER_LOCAL     "0.0.0.0"
@@ -39,10 +43,17 @@ struct winsdb_addr {
 */
 struct winsdb_record {
        struct nbt_name *name;
-       uint16_t nb_flags;
-       enum wins_record_state state;
+       enum wrepl_name_type type;
+       enum wrepl_name_state state;
+       enum wrepl_name_node node;
+       BOOL is_static;
        const char *wins_owner;
        time_t expire_time;
+       uint64_t version;
+       const char *wins_owner;
+       struct winsdb_addr **addresses;
+
+       /* only needed for debugging problems */
        const char *registered_by;
        struct winsdb_addr **addresses;
        uint64_t version;
index 8a5fabae4e84c01009a86fbf0deac2b33613bbfe..0f6717f4a92e53951c25940e951926f81bcd9d4a 100644 (file)
@@ -36,6 +36,21 @@ uint32_t wins_server_ttl(struct wins_server *winssrv, uint32_t ttl)
        return ttl;
 }
 
+static enum wrepl_name_type wrepl_type(uint16_t nb_flags, struct nbt_name *name, BOOL mhomed)
+{
+       /* this copes with the nasty hack that is the type 0x1c name */
+       if (name->type != NBT_NAME_LOGON) {
+               return WREPL_TYPE_SGROUP;
+       }
+       if (nb_flags & NBT_NM_GROUP) {
+               return WREPL_TYPE_GROUP;
+       }
+       if (mhomed) {
+               return WREPL_TYPE_MHOMED;
+       }
+       return WREPL_TYPE_UNIQUE;
+}
+
 /*
   register a new name with WINS
 */
@@ -51,6 +66,9 @@ static uint8_t wins_register_new(struct nbt_name_socket *nbtsock,
        uint16_t nb_flags = packet->additional[0].rdata.netbios.addresses[0].nb_flags;
        const char *address = packet->additional[0].rdata.netbios.addresses[0].ipaddr;
        struct winsdb_record rec;
+       enum wrepl_name_type type;
+       enum wrepl_name_node node;
+       BOOL mhomed = ((packet->operation & NBT_OPCODE) == NBT_OPCODE_MULTI_HOME_REG);
 
        rec.name          = name;
        rec.nb_flags      = nb_flags;
@@ -137,7 +155,7 @@ static void nbtd_winsserver_register(struct nbt_name_socket *nbtsock,
        }
 
        /* its an active name - first see if the registration is of the right type */
-       if ((rec->nb_flags & NBT_NM_GROUP) && !(nb_flags & NBT_NM_GROUP)) {
+       if ((rec->type == WREPL_TYPE_GROUP) && !(nb_flags & NBT_NM_GROUP)) {
                DEBUG(2,("WINS: Attempt to register unique name %s when group name is active\n",
                         nbt_name_string(packet, name)));
                rcode = NBT_RCODE_ACT;
@@ -158,6 +176,15 @@ static void nbtd_winsserver_register(struct nbt_name_socket *nbtsock,
                goto done;
        }
 
+       /*
+        * TODO: this complete functions needs a lot of work,
+        *       to handle special group and multiomed registrations
+        */
+       if (name->type == NBT_NAME_LOGON) {
+               wins_update_ttl(nbtsock, packet, rec, src);
+               goto done;
+       }
+
        /* if the registration is for an address that is currently active, then 
           just update the expiry time */
        if (winsdb_addr_list_check(rec->addresses, address)) {
@@ -246,7 +273,7 @@ static void nbtd_winsserver_release(struct nbt_name_socket *nbtsock,
                DEBUG(4,("WINS: released name %s at %s\n", nbt_name_string(rec, rec->name), address));
                winsdb_addr_list_remove(rec->addresses, address);
                if (rec->addresses[0] == NULL) {
-                       rec->state = WINS_REC_RELEASED;
+                       rec->state = WREPL_STATE_RELEASED;
                }
                winsdb_modify(winssrv, rec);
        }
diff --git a/source4/wrepl_server/wrepl_apply_records.c b/source4/wrepl_server/wrepl_apply_records.c
new file mode 100644 (file)
index 0000000..37c4ce2
--- /dev/null
@@ -0,0 +1,56 @@
+/* 
+   Unix SMB/CIFS implementation.
+   
+   WINS Replication server
+   
+   Copyright (C) Stefan Metzmacher     2005
+   
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 2 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+*/
+
+#include "includes.h"
+#include "dlinklist.h"
+#include "lib/events/events.h"
+#include "lib/socket/socket.h"
+#include "smbd/service_task.h"
+#include "smbd/service_stream.h"
+#include "lib/messaging/irpc.h"
+#include "librpc/gen_ndr/ndr_winsrepl.h"
+#include "wrepl_server/wrepl_server.h"
+#include "wrepl_server/wrepl_out_helpers.h"
+#include "nbt_server/wins/winsdb.h"
+#include "ldb/include/ldb.h"
+#include "libcli/composite/composite.h"
+#include "libcli/wrepl/winsrepl.h"
+
+NTSTATUS wreplsrv_apply_records(struct wreplsrv_partner *partner, struct wreplsrv_pull_names_io *names_io)
+{
+       NTSTATUS status;
+
+       /* TODO: ! */
+       DEBUG(0,("TODO: apply records count[%u]:owner[%s]:min[%llu]:max[%llu]:partner[%s]\n",
+               names_io->out.num_names, names_io->in.owner.address,
+               names_io->in.owner.min_version, names_io->in.owner.max_version,
+               partner->address));
+
+       status = wreplsrv_add_table(partner->service,
+                                   partner->service,
+                                   &partner->service->table,
+                                   names_io->in.owner.address,
+                                   names_io->in.owner.max_version);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return NT_STATUS_OK;
+}
diff --git a/source4/wrepl_server/wrepl_out_helpers.c b/source4/wrepl_server/wrepl_out_helpers.c
new file mode 100644 (file)
index 0000000..217bb87
--- /dev/null
@@ -0,0 +1,1043 @@
+/* 
+   Unix SMB/CIFS implementation.
+   
+   WINS Replication server
+   
+   Copyright (C) Stefan Metzmacher     2005
+   
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 2 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+*/
+
+#include "includes.h"
+#include "dlinklist.h"
+#include "lib/events/events.h"
+#include "lib/socket/socket.h"
+#include "smbd/service_task.h"
+#include "smbd/service_stream.h"
+#include "lib/messaging/irpc.h"
+#include "librpc/gen_ndr/ndr_winsrepl.h"
+#include "wrepl_server/wrepl_server.h"
+#include "wrepl_server/wrepl_out_helpers.h"
+#include "nbt_server/wins/winsdb.h"
+#include "ldb/include/ldb.h"
+#include "libcli/composite/composite.h"
+#include "libcli/wrepl/winsrepl.h"
+
+enum wreplsrv_out_connect_stage {
+       WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET,
+       WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX,
+       WREPLSRV_OUT_CONNECT_STAGE_DONE
+};
+
+struct wreplsrv_out_connect_state {
+       enum wreplsrv_out_connect_stage stage;
+       struct composite_context *c;
+       struct wrepl_request *req;
+       struct wrepl_associate assoc_io;
+       enum winsrepl_partner_type type;
+       struct wreplsrv_out_connection *wreplconn;
+};
+
+static void wreplsrv_out_connect_handler(struct wrepl_request *req);
+
+static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state)
+{
+       NTSTATUS status;
+
+       status = wrepl_connect_recv(state->req);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       state->req = wrepl_associate_send(state->wreplconn->sock, &state->assoc_io);
+       NT_STATUS_HAVE_NO_MEMORY(state->req);
+
+       state->req->async.fn            = wreplsrv_out_connect_handler;
+       state->req->async.private       = state;
+
+       state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX;
+
+       return NT_STATUS_OK;
+}
+
+static NTSTATUS wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state *state)
+{
+       NTSTATUS status;
+
+       status = wrepl_associate_recv(state->req, &state->assoc_io);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       state->wreplconn->assoc_ctx.peer_ctx = state->assoc_io.out.assoc_ctx;
+
+       if (state->type == WINSREPL_PARTNER_PUSH) {
+               state->wreplconn->partner->push.wreplconn = state->wreplconn;
+               talloc_steal(state->wreplconn->partner, state->wreplconn);
+       } else if (state->type == WINSREPL_PARTNER_PULL) {
+               state->wreplconn->partner->pull.wreplconn = state->wreplconn;
+               talloc_steal(state->wreplconn->partner, state->wreplconn);
+       }
+
+       state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
+
+       return NT_STATUS_OK;
+}
+
+static void wreplsrv_out_connect_handler(struct wrepl_request *req)
+{
+       struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private,
+                                                  struct wreplsrv_out_connect_state);
+       struct composite_context *c = state->c;
+
+       switch (state->stage) {
+       case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET:
+               c->status = wreplsrv_out_connect_wait_socket(state);
+               break;
+       case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX:
+               c->status = wreplsrv_out_connect_wait_assoc_ctx(state);
+               c->state  = COMPOSITE_STATE_DONE;
+               break;
+       case WREPLSRV_OUT_CONNECT_STAGE_DONE:
+               c->status = NT_STATUS_INTERNAL_ERROR;
+       }
+
+       if (!NT_STATUS_IS_OK(c->status)) {
+               c->state = COMPOSITE_STATE_ERROR;
+       }
+
+       if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
+               c->async.fn(c);
+       }
+}
+
+static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner,
+                                                          enum winsrepl_partner_type type,
+                                                          struct wreplsrv_out_connection *wreplconn)
+{
+       struct composite_context *c = NULL;
+       struct wreplsrv_service *service = partner->service;
+       struct wreplsrv_out_connect_state *state = NULL;
+       struct wreplsrv_out_connection **wreplconnp = &wreplconn;
+       BOOL cached_connection = False;
+
+       c = talloc_zero(partner, struct composite_context);
+       if (!c) goto failed;
+
+       state = talloc_zero(c, struct wreplsrv_out_connect_state);
+       if (!state) goto failed;
+       state->c        = c;
+       state->type     = type;
+
+       c->state        = COMPOSITE_STATE_IN_PROGRESS;
+       c->event_ctx    = service->task->event_ctx;
+       c->private_data = state;
+
+       if (type == WINSREPL_PARTNER_PUSH) {
+               cached_connection       = True;
+               wreplconn               = partner->push.wreplconn;
+               wreplconnp              = &partner->push.wreplconn;
+       } else if (type == WINSREPL_PARTNER_PULL) {
+               cached_connection       = True;
+               wreplconn               = partner->pull.wreplconn;
+               wreplconnp              = &partner->pull.wreplconn;
+       }
+
+       /* we have a connection already, so use it */
+       if (wreplconn) {
+               if (!wreplconn->sock->dead) {
+                       state->stage    = WREPLSRV_OUT_CONNECT_STAGE_DONE;
+                       state->wreplconn= wreplconn;
+                       composite_trigger_done(c);
+                       return c;
+               } else if (!cached_connection) {
+                       state->stage    = WREPLSRV_OUT_CONNECT_STAGE_DONE;
+                       state->wreplconn= NULL;
+                       composite_trigger_done(c);
+                       return c;
+               } else {
+                       talloc_free(wreplconn);
+                       *wreplconnp = NULL;
+               }
+       }
+
+       wreplconn = talloc_zero(state, struct wreplsrv_out_connection);
+       if (!wreplconn) goto failed;
+
+       wreplconn->service      = service;
+       wreplconn->partner      = partner;
+       wreplconn->sock         = wrepl_socket_init(wreplconn, service->task->event_ctx);
+       if (!wreplconn->sock) goto failed;
+
+       state->stage    = WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET;
+       state->wreplconn= wreplconn;
+       state->req      = wrepl_connect_send(wreplconn->sock,
+                                            partner->our_address,
+                                            partner->address);
+       if (!state->req) goto failed;
+
+       state->req->async.fn            = wreplsrv_out_connect_handler;
+       state->req->async.private       = state;
+
+       return c;
+failed:
+       talloc_free(c);
+       return NULL;
+}
+
+static NTSTATUS wreplsrv_out_connect_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
+                                         struct wreplsrv_out_connection **wreplconn)
+{
+       NTSTATUS status;
+
+       status = composite_wait(c);
+
+       if (NT_STATUS_IS_OK(status)) {
+               struct wreplsrv_out_connect_state *state = talloc_get_type(c->private_data,
+                                                          struct wreplsrv_out_connect_state);
+               if (state->wreplconn) {
+                       *wreplconn = talloc_reference(mem_ctx, state->wreplconn);
+                       if (!*wreplconn) status = NT_STATUS_NO_MEMORY;
+               } else {
+                       status = NT_STATUS_INVALID_CONNECTION;
+               }
+       }
+
+       talloc_free(c);
+       return status;
+       
+}
+
+enum wreplsrv_pull_table_stage {
+       WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION,
+       WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY,
+       WREPLSRV_PULL_TABLE_STAGE_DONE
+};
+
+struct wreplsrv_pull_table_state {
+       enum wreplsrv_pull_table_stage stage;
+       struct composite_context *c;
+       struct wrepl_request *req;
+       struct wrepl_pull_table table_io;
+       struct wreplsrv_pull_table_io *io;
+       struct composite_context *creq;
+       struct wreplsrv_out_connection *wreplconn;
+};
+
+static void wreplsrv_pull_table_handler_req(struct wrepl_request *req);
+
+static NTSTATUS wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state *state)
+{
+       NTSTATUS status;
+
+       status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       state->table_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
+       state->req = wrepl_pull_table_send(state->wreplconn->sock, &state->table_io);
+       NT_STATUS_HAVE_NO_MEMORY(state->req);
+
+       state->req->async.fn            = wreplsrv_pull_table_handler_req;
+       state->req->async.private       = state;
+
+       state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY;
+
+       return NT_STATUS_OK;
+}
+
+static NTSTATUS wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state *state)
+{
+       NTSTATUS status;
+
+       status = wrepl_pull_table_recv(state->req, state, &state->table_io);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
+
+       return NT_STATUS_OK;
+}
+
+static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state *state)
+{
+       struct composite_context *c = state->c;
+
+       switch (state->stage) {
+       case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION:
+               c->status = wreplsrv_pull_table_wait_connection(state);
+               break;
+       case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY:
+               c->status = wreplsrv_pull_table_wait_table_reply(state);
+               c->state  = COMPOSITE_STATE_DONE;
+               break;
+       case WREPLSRV_PULL_TABLE_STAGE_DONE:
+               c->status = NT_STATUS_INTERNAL_ERROR;
+       }
+
+       if (!NT_STATUS_IS_OK(c->status)) {
+               c->state = COMPOSITE_STATE_ERROR;
+       }
+
+       if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
+               c->async.fn(c);
+       }
+}
+
+static void wreplsrv_pull_table_handler_creq(struct composite_context *creq)
+{
+       struct wreplsrv_pull_table_state *state = talloc_get_type(creq->async.private_data,
+                                                 struct wreplsrv_pull_table_state);
+       wreplsrv_pull_table_handler(state);
+       return;
+}
+
+static void wreplsrv_pull_table_handler_req(struct wrepl_request *req)
+{
+       struct wreplsrv_pull_table_state *state = talloc_get_type(req->async.private,
+                                                 struct wreplsrv_pull_table_state);
+       wreplsrv_pull_table_handler(state);
+       return;
+}
+
+struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io)
+{
+       struct composite_context *c = NULL;
+       struct wreplsrv_service *service = io->in.partner->service;
+       struct wreplsrv_pull_table_state *state = NULL;
+
+       c = talloc_zero(mem_ctx, struct composite_context);
+       if (!c) goto failed;
+
+       state = talloc_zero(c, struct wreplsrv_pull_table_state);
+       if (!state) goto failed;
+       state->c        = c;
+       state->io       = io;
+
+       c->state        = COMPOSITE_STATE_IN_PROGRESS;
+       c->event_ctx    = service->task->event_ctx;
+       c->private_data = state;
+
+       if (io->in.num_owners) {
+               state->table_io.out.num_partners        = io->in.num_owners;
+               state->table_io.out.partners            = io->in.owners;
+               state->stage                            = WREPLSRV_PULL_TABLE_STAGE_DONE;
+               composite_trigger_done(c);
+               return c;
+       }
+
+       state->stage    = WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION;
+       state->creq     = wreplsrv_out_connect_send(io->in.partner, WINSREPL_PARTNER_PULL, NULL);
+       if (!state->creq) goto failed;
+
+       state->creq->async.fn           = wreplsrv_pull_table_handler_creq;
+       state->creq->async.private_data = state;
+
+       return c;
+failed:
+       talloc_free(c);
+       return NULL;
+}
+
+NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
+                                 struct wreplsrv_pull_table_io *io)
+{
+       NTSTATUS status;
+
+       status = composite_wait(c);
+
+       if (NT_STATUS_IS_OK(status)) {
+               struct wreplsrv_pull_table_state *state = talloc_get_type(c->private_data,
+                                                         struct wreplsrv_pull_table_state);
+               io->out.num_owners      = state->table_io.out.num_partners;
+               io->out.owners          = state->table_io.out.partners;
+               talloc_reference(mem_ctx, state->table_io.out.partners);
+       }
+
+       talloc_free(c);
+       return status;  
+}
+
+enum wreplsrv_pull_names_stage {
+       WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION,
+       WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY,
+       WREPLSRV_PULL_NAMES_STAGE_DONE
+};
+
+struct wreplsrv_pull_names_state {
+       enum wreplsrv_pull_names_stage stage;
+       struct composite_context *c;
+       struct wrepl_request *req;
+       struct wrepl_pull_names pull_io;
+       struct wreplsrv_pull_names_io *io;
+       struct composite_context *creq;
+       struct wreplsrv_out_connection *wreplconn;
+};
+
+static void wreplsrv_pull_names_handler_req(struct wrepl_request *req);
+
+static NTSTATUS wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state *state)
+{
+       NTSTATUS status;
+
+       status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       state->pull_io.in.assoc_ctx     = state->wreplconn->assoc_ctx.peer_ctx;
+       state->pull_io.in.partner       = state->io->in.owner;
+       state->req = wrepl_pull_names_send(state->wreplconn->sock, &state->pull_io);
+       NT_STATUS_HAVE_NO_MEMORY(state->req);
+
+       state->req->async.fn            = wreplsrv_pull_names_handler_req;
+       state->req->async.private       = state;
+
+       state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY;
+
+       return NT_STATUS_OK;
+}
+
+static NTSTATUS wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state *state)
+{
+       NTSTATUS status;
+
+       status = wrepl_pull_names_recv(state->req, state, &state->pull_io);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       state->stage = WREPLSRV_PULL_NAMES_STAGE_DONE;
+
+       return NT_STATUS_OK;
+}
+
+static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state *state)
+{
+       struct composite_context *c = state->c;
+
+       switch (state->stage) {
+       case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION:
+               c->status = wreplsrv_pull_names_wait_connection(state);
+               break;
+       case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY:
+               c->status = wreplsrv_pull_names_wait_send_reply(state);
+               c->state  = COMPOSITE_STATE_DONE;
+               break;
+       case WREPLSRV_PULL_NAMES_STAGE_DONE:
+               c->status = NT_STATUS_INTERNAL_ERROR;
+       }
+
+       if (!NT_STATUS_IS_OK(c->status)) {
+               c->state = COMPOSITE_STATE_ERROR;
+       }
+
+       if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
+               c->async.fn(c);
+       }
+}
+
+static void wreplsrv_pull_names_handler_creq(struct composite_context *creq)
+{
+       struct wreplsrv_pull_names_state *state = talloc_get_type(creq->async.private_data,
+                                                 struct wreplsrv_pull_names_state);
+       wreplsrv_pull_names_handler(state);
+       return;
+}
+
+static void wreplsrv_pull_names_handler_req(struct wrepl_request *req)
+{
+       struct wreplsrv_pull_names_state *state = talloc_get_type(req->async.private,
+                                                 struct wreplsrv_pull_names_state);
+       wreplsrv_pull_names_handler(state);
+       return;
+}
+
+struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io)
+{
+       struct composite_context *c = NULL;
+       struct wreplsrv_service *service = io->in.partner->service;
+       struct wreplsrv_pull_names_state *state = NULL;
+       enum winsrepl_partner_type partner_type = WINSREPL_PARTNER_PULL;
+
+       if (io->in.wreplconn) partner_type = WINSREPL_PARTNER_NONE;
+
+       c = talloc_zero(mem_ctx, struct composite_context);
+       if (!c) goto failed;
+
+       state = talloc_zero(c, struct wreplsrv_pull_names_state);
+       if (!state) goto failed;
+       state->c        = c;
+       state->io       = io;
+
+       c->state        = COMPOSITE_STATE_IN_PROGRESS;
+       c->event_ctx    = service->task->event_ctx;
+       c->private_data = state;
+
+       state->stage    = WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION;
+       state->creq     = wreplsrv_out_connect_send(io->in.partner, partner_type, io->in.wreplconn);
+       if (!state->creq) goto failed;
+
+       state->creq->async.fn           = wreplsrv_pull_names_handler_creq;
+       state->creq->async.private_data = state;
+
+       return c;
+failed:
+       talloc_free(c);
+       return NULL;
+}
+
+NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
+                                 struct wreplsrv_pull_names_io *io)
+{
+       NTSTATUS status;
+
+       status = composite_wait(c);
+
+       if (NT_STATUS_IS_OK(status)) {
+               struct wreplsrv_pull_names_state *state = talloc_get_type(c->private_data,
+                                                         struct wreplsrv_pull_names_state);
+               io->out.num_names       = state->pull_io.out.num_names;
+               io->out.names           = state->pull_io.out.names;
+               talloc_reference(mem_ctx, state->pull_io.out.names);
+       }
+
+       talloc_free(c);
+       return status;
+       
+}
+
+enum wreplsrv_pull_cycle_stage {
+       WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY,
+       WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES,
+       WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC,
+       WREPLSRV_PULL_CYCLE_STAGE_DONE
+};
+
+struct wreplsrv_pull_cycle_state {
+       enum wreplsrv_pull_cycle_stage stage;
+       struct composite_context *c;
+       struct wreplsrv_pull_cycle_io *io;
+       struct wreplsrv_pull_table_io table_io;
+       uint32_t current;
+       struct wreplsrv_pull_names_io names_io;
+       struct composite_context *creq;
+       struct wrepl_associate_stop assoc_stop_io;
+       struct wrepl_request *req;
+};
+
+static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq);
+static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req);
+
+static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state *state)
+{
+       struct wreplsrv_owner *current_owner;
+       struct wreplsrv_owner *local_owner;
+       uint32_t i;
+       uint64_t old_max_version = 0;
+       BOOL do_pull = False;
+
+       for (i=state->current; i < state->table_io.out.num_owners; i++) {
+               current_owner = wreplsrv_find_owner(state->io->in.partner->pull.table,
+                                                   state->table_io.out.owners[i].address);
+
+               local_owner = wreplsrv_find_owner(state->io->in.partner->service->table,
+                                                 state->table_io.out.owners[i].address);
+               /*
+                * this means we are ourself the current owner,
+                * and we don't want replicate ourself
+                */
+               if (!current_owner) continue;
+
+               /*
+                * this means we don't have any records of this owner
+                * so fetch them
+                */
+               if (!local_owner) {
+                       do_pull         = True;
+                       
+                       break;
+               }
+
+               /*
+                * this means the remote partner has some new records of this owner
+                * fetch them
+                */
+               if (current_owner->owner.max_version > local_owner->owner.max_version) {
+                       do_pull         = True;
+                       old_max_version = local_owner->owner.max_version;
+                       break;
+               }
+       }
+       state->current = i;
+
+       if (do_pull) {
+               state->names_io.in.partner              = state->io->in.partner;
+               state->names_io.in.wreplconn            = state->io->in.wreplconn;
+               state->names_io.in.owner                = current_owner->owner;
+               state->names_io.in.owner.min_version    = old_max_version;
+               state->creq = wreplsrv_pull_names_send(state, &state->names_io);
+               NT_STATUS_HAVE_NO_MEMORY(state->creq);
+
+               state->creq->async.fn           = wreplsrv_pull_cycle_handler_creq;
+               state->creq->async.private_data = state;
+
+               return STATUS_MORE_ENTRIES;
+       }
+
+       return NT_STATUS_OK;
+}
+
+static NTSTATUS wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state *state)
+{
+       NTSTATUS status;
+
+       status = wreplsrv_pull_cycle_next_owner_do_work(state);
+       if (NT_STATUS_IS_OK(status)) {
+               state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
+       } else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES, status)) {
+               state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES;
+               status = NT_STATUS_OK;
+       }
+
+       if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE && state->io->in.wreplconn) {
+               state->assoc_stop_io.in.assoc_ctx       = state->io->in.wreplconn->assoc_ctx.peer_ctx;
+               state->assoc_stop_io.in.reason          = 0;
+               state->req = wrepl_associate_stop_send(state->io->in.wreplconn->sock, &state->assoc_stop_io);
+               NT_STATUS_HAVE_NO_MEMORY(state->req);
+
+               state->req->async.fn            = wreplsrv_pull_cycle_handler_req;
+               state->req->async.private       = state;
+
+               state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC;
+       }
+
+       return status;
+}
+
+static NTSTATUS wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state *state)
+{
+       NTSTATUS status;
+       uint32_t i;
+
+       status = wreplsrv_pull_table_recv(state->creq, state, &state->table_io);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       /* update partner table */
+       for (i=0; i < state->table_io.out.num_owners; i++) {
+               BOOL is_our_addr;
+
+               is_our_addr = wreplsrv_is_our_address(state->io->in.partner->service,
+                                                     state->table_io.out.owners[i].address);
+               if (is_our_addr) continue;
+
+               status = wreplsrv_add_table(state->io->in.partner->service,
+                                           state->io->in.partner, 
+                                           &state->io->in.partner->pull.table,
+                                           state->table_io.out.owners[i].address,
+                                           state->table_io.out.owners[i].max_version);
+               NT_STATUS_NOT_OK_RETURN(status);
+       }
+
+       status = wreplsrv_pull_cycle_next_owner_wrapper(state);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return status;
+}
+
+static NTSTATUS wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state *state)
+{
+       NTSTATUS status;
+
+       status = wreplsrv_apply_records(state->io->in.partner, &state->names_io);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       talloc_free(state->names_io.out.names);
+       ZERO_STRUCT(state->names_io);
+
+       return NT_STATUS_OK;
+}
+
+static NTSTATUS wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state *state)
+{
+       NTSTATUS status;
+
+       status = wreplsrv_pull_names_recv(state->creq, state, &state->names_io);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       /*
+        * TODO: this should maybe an async call,
+        *       because we may need some network access
+        *       for conflict resolving
+        */
+       status = wreplsrv_pull_cycle_apply_records(state);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       status = wreplsrv_pull_cycle_next_owner_wrapper(state);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return status;
+}
+
+static NTSTATUS wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state *state)
+{
+       NTSTATUS status;
+
+       status = wrepl_associate_stop_recv(state->req, &state->assoc_stop_io);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
+
+       return status;
+}
+
+static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state *state)
+{
+       struct composite_context *c = state->c;
+
+       switch (state->stage) {
+       case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY:
+               c->status = wreplsrv_pull_cycle_wait_table_reply(state);
+               break;
+       case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES:
+               c->status = wreplsrv_pull_cycle_wait_send_replies(state);
+               break;
+       case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC:
+               c->status = wreplsrv_pull_cycle_wait_stop_assoc(state);
+               break;
+       case WREPLSRV_PULL_CYCLE_STAGE_DONE:
+               c->status = NT_STATUS_INTERNAL_ERROR;
+       }
+
+       if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE) {
+               c->state  = COMPOSITE_STATE_DONE;
+       }
+
+       if (!NT_STATUS_IS_OK(c->status)) {
+               c->state = COMPOSITE_STATE_ERROR;
+       }
+
+       if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
+               c->async.fn(c);
+       }
+}
+
+static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq)
+{
+       struct wreplsrv_pull_cycle_state *state = talloc_get_type(creq->async.private_data,
+                                                 struct wreplsrv_pull_cycle_state);
+       wreplsrv_pull_cycle_handler(state);
+       return;
+}
+
+static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req)
+{
+       struct wreplsrv_pull_cycle_state *state = talloc_get_type(req->async.private,
+                                                 struct wreplsrv_pull_cycle_state);
+       wreplsrv_pull_cycle_handler(state);
+       return;
+}
+
+struct composite_context *wreplsrv_pull_cycle_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_cycle_io *io)
+{
+       struct composite_context *c = NULL;
+       struct wreplsrv_service *service = io->in.partner->service;
+       struct wreplsrv_pull_cycle_state *state = NULL;
+
+       c = talloc_zero(mem_ctx, struct composite_context);
+       if (!c) goto failed;
+
+       state = talloc_zero(c, struct wreplsrv_pull_cycle_state);
+       if (!state) goto failed;
+       state->c        = c;
+       state->io       = io;
+
+       c->state        = COMPOSITE_STATE_IN_PROGRESS;
+       c->event_ctx    = service->task->event_ctx;
+       c->private_data = state;
+
+       state->stage    = WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY;
+       state->table_io.in.partner      = io->in.partner;
+       state->table_io.in.num_owners   = io->in.num_owners;
+       state->table_io.in.owners       = io->in.owners;
+       state->creq = wreplsrv_pull_table_send(state, &state->table_io);
+       if (!state->creq) goto failed;
+
+       state->creq->async.fn           = wreplsrv_pull_cycle_handler_creq;
+       state->creq->async.private_data = state;
+
+       return c;
+failed:
+       talloc_free(c);
+       return NULL;
+}
+
+NTSTATUS wreplsrv_pull_cycle_recv(struct composite_context *c)
+{
+       NTSTATUS status;
+
+       status = composite_wait(c);
+
+       talloc_free(c);
+       return status;
+}
+
+enum wreplsrv_push_notify_stage {
+       WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT,
+       WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE,
+       WREPLSRV_PUSH_NOTIFY_STAGE_DONE
+};
+
+struct wreplsrv_push_notify_state {
+       enum wreplsrv_push_notify_stage stage;
+       struct composite_context *c;
+       struct wreplsrv_push_notify_io *io;
+       enum wrepl_replication_cmd command;
+       BOOL full_table;
+       struct wrepl_request *req;
+       struct wrepl_packet req_packet;
+       struct wrepl_packet *rep_packet;
+       struct composite_context *creq;
+       struct wreplsrv_out_connection *wreplconn;
+};
+
+static void wreplsrv_push_notify_handler_creq(struct composite_context *creq);
+static void wreplsrv_push_notify_handler_req(struct wrepl_request *req);
+
+static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *state)
+{
+       struct wreplsrv_service *service = state->io->in.partner->service;
+       struct wrepl_packet *req = &state->req_packet;
+       struct wrepl_replication *repl_out = &state->req_packet.message.replication;
+       struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
+       struct wreplsrv_in_connection *wrepl_in;
+       NTSTATUS status;
+       struct socket_context *sock;
+       struct data_blob_list_item *update_rep;
+       const char *our_ip;
+       DATA_BLOB update_blob;
+
+       req->opcode     = WREPL_OPCODE_BITS;
+       req->assoc_ctx  = state->wreplconn->assoc_ctx.peer_ctx;
+       req->mess_type  = WREPL_REPLICATION;
+
+       repl_out->command = state->command;
+
+       our_ip = socket_get_my_addr(state->wreplconn->sock->sock, state);
+       NT_STATUS_HAVE_NO_MEMORY(our_ip);
+
+       status = wreplsrv_fill_wrepl_table(service, state, table_out,
+                                          our_ip, our_ip, state->full_table);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       state->req = wrepl_request_send(state->wreplconn->sock, req);
+       NT_STATUS_HAVE_NO_MEMORY(state->req);
+
+       sock = state->wreplconn->sock->sock;
+       talloc_steal(state, state->wreplconn->sock->sock);
+       state->wreplconn->sock->sock = NULL;
+
+       update_blob = state->req->buffer;
+       talloc_steal(state, state->req->buffer.data);
+
+       talloc_free(state->wreplconn->sock);
+       state->wreplconn->sock = NULL;
+
+       status = wreplsrv_in_connection_merge(state->io->in.partner,
+                                             sock, &wrepl_in);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       wrepl_in->assoc_ctx.peer_ctx    = state->wreplconn->assoc_ctx.peer_ctx;
+       wrepl_in->assoc_ctx.our_ctx     = 0;
+
+       update_rep = talloc(wrepl_in, struct data_blob_list_item);
+       NT_STATUS_HAVE_NO_MEMORY(update_rep);
+
+       update_rep->blob = update_blob;
+       talloc_steal(update_rep, update_blob.data);
+
+       talloc_free(state->wreplconn);
+       state->wreplconn = NULL;
+
+       if (!wrepl_in->send_queue) {
+               EVENT_FD_WRITEABLE(wrepl_in->conn->event.fde);
+       }
+       DLIST_ADD_END(wrepl_in->send_queue, update_rep, struct data_blob_list_item *);
+
+       state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
+
+       return NT_STATUS_OK;
+}
+
+static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *state)
+{
+       struct wreplsrv_service *service = state->io->in.partner->service;
+       struct wrepl_packet *req = &state->req_packet;
+       struct wrepl_replication *repl_out = &state->req_packet.message.replication;
+       struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
+       NTSTATUS status;
+       const char *our_ip;
+
+       req->opcode     = WREPL_OPCODE_BITS;
+       req->assoc_ctx  = state->wreplconn->assoc_ctx.peer_ctx;
+       req->mess_type  = WREPL_REPLICATION;
+
+       repl_out->command = state->command;
+
+       our_ip = socket_get_my_addr(state->wreplconn->sock->sock, state);
+       NT_STATUS_HAVE_NO_MEMORY(our_ip);
+
+       status = wreplsrv_fill_wrepl_table(service, state, table_out,
+                                          our_ip, our_ip, state->full_table);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       state->req = wrepl_request_send(state->wreplconn->sock, req);
+       NT_STATUS_HAVE_NO_MEMORY(state->req);
+
+       /* we won't get a reply to a inform message */
+       state->req->send_only           = True;
+       state->req->async.fn            = wreplsrv_push_notify_handler_req;
+       state->req->async.private       = state;
+
+       state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE;
+
+       return NT_STATUS_OK;
+}
+
+static NTSTATUS wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state *state)
+{
+       NTSTATUS status;
+
+       status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       switch (state->command) {
+       case WREPL_REPL_UPDATE:
+               state->full_table = True;
+               return wreplsrv_push_notify_update(state);
+       case WREPL_REPL_UPDATE2:
+               state->full_table = False;
+               return wreplsrv_push_notify_update(state);
+       case WREPL_REPL_INFORM:
+               state->full_table = True;
+               return wreplsrv_push_notify_inform(state);
+       case WREPL_REPL_INFORM2:
+               state->full_table = False;
+               return wreplsrv_push_notify_inform(state);
+       default:
+               return NT_STATUS_INTERNAL_ERROR;
+       }
+
+       return NT_STATUS_INTERNAL_ERROR;
+}
+
+static NTSTATUS wreplsrv_push_notify_wait_update(struct wreplsrv_push_notify_state *state)
+{
+       return NT_STATUS_FOOBAR;
+}
+
+static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state *state)
+{
+       struct composite_context *c = state->c;
+
+       switch (state->stage) {
+       case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT:
+               c->status = wreplsrv_push_notify_wait_connect(state);
+               break;
+       case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE:
+               c->status = wreplsrv_push_notify_wait_update(state);
+               break;
+       case WREPLSRV_PUSH_NOTIFY_STAGE_DONE:
+               c->status = NT_STATUS_INTERNAL_ERROR;
+       }
+
+       if (state->stage == WREPLSRV_PUSH_NOTIFY_STAGE_DONE) {
+               c->state  = COMPOSITE_STATE_DONE;
+       }
+
+       if (!NT_STATUS_IS_OK(c->status)) {
+               c->state = COMPOSITE_STATE_ERROR;
+       }
+
+       if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
+               c->async.fn(c);
+       }
+}
+
+static void wreplsrv_push_notify_handler_creq(struct composite_context *creq)
+{
+       struct wreplsrv_push_notify_state *state = talloc_get_type(creq->async.private_data,
+                                                  struct wreplsrv_push_notify_state);
+       wreplsrv_push_notify_handler(state);
+       return;
+}
+
+static void wreplsrv_push_notify_handler_req(struct wrepl_request *req)
+{
+       struct wreplsrv_push_notify_state *state = talloc_get_type(req->async.private,
+                                                  struct wreplsrv_push_notify_state);
+       wreplsrv_push_notify_handler(state);
+       return;
+}
+
+struct composite_context *wreplsrv_push_notify_send(TALLOC_CTX *mem_ctx, struct wreplsrv_push_notify_io *io)
+{
+       struct composite_context *c = NULL;
+       struct wreplsrv_service *service = io->in.partner->service;
+       struct wreplsrv_push_notify_state *state = NULL;
+       enum winsrepl_partner_type partner_type;
+
+       c = talloc_zero(mem_ctx, struct composite_context);
+       if (!c) goto failed;
+
+       state = talloc_zero(c, struct wreplsrv_push_notify_state);
+       if (!state) goto failed;
+       state->c        = c;
+       state->io       = io;
+
+       if (io->in.inform) {
+               /* we can cache the connection in partner->push->wreplconn */
+               partner_type = WINSREPL_PARTNER_PUSH;
+               if (io->in.propagate) {
+                       state->command  = WREPL_REPL_INFORM2;
+               } else {
+                       state->command  = WREPL_REPL_INFORM;
+               }
+       } else {
+               /* we can NOT cache the connection */
+               partner_type = WINSREPL_PARTNER_NONE;
+               if (io->in.propagate) {
+                       state->command  = WREPL_REPL_UPDATE2;
+               } else {
+                       state->command  = WREPL_REPL_UPDATE;
+               }       
+       }
+
+       c->state        = COMPOSITE_STATE_IN_PROGRESS;
+       c->event_ctx    = service->task->event_ctx;
+       c->private_data = state;
+
+       state->stage    = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT;
+       state->creq     = wreplsrv_out_connect_send(io->in.partner, partner_type, NULL);
+       if (!state->creq) goto failed;
+
+       state->creq->async.fn           = wreplsrv_push_notify_handler_creq;
+       state->creq->async.private_data = state;
+
+       return c;
+failed:
+       talloc_free(c);
+       return NULL;
+}
+
+NTSTATUS wreplsrv_push_notify_recv(struct composite_context *c)
+{
+       NTSTATUS status;
+
+       status = composite_wait(c);
+
+       talloc_free(c);
+       return status;
+}
diff --git a/source4/wrepl_server/wrepl_out_helpers.h b/source4/wrepl_server/wrepl_out_helpers.h
new file mode 100644 (file)
index 0000000..ead24fc
--- /dev/null
@@ -0,0 +1,62 @@
+/* 
+   Unix SMB/CIFS implementation.
+   
+   WINS Replication server
+   
+   Copyright (C) Stefan Metzmacher     2005
+   
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 2 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+*/
+
+struct wreplsrv_pull_table_io {
+       struct {
+               struct wreplsrv_partner *partner;
+               uint32_t num_owners;
+               struct wrepl_wins_owner *owners;
+       } in;
+       struct {
+               uint32_t num_owners;
+               struct wrepl_wins_owner *owners;
+       } out;
+};
+
+struct wreplsrv_pull_names_io {
+       struct {
+               struct wreplsrv_partner *partner;
+               struct wreplsrv_out_connection *wreplconn;
+               struct wrepl_wins_owner owner;
+       } in;
+       struct {
+               uint32_t num_names;
+               struct wrepl_name *names;
+       } out;
+};
+
+struct wreplsrv_pull_cycle_io {
+       struct {
+               struct wreplsrv_partner *partner;
+               uint32_t num_owners;
+               struct wrepl_wins_owner *owners;
+               struct wreplsrv_out_connection *wreplconn;
+       } in;
+};
+
+struct wreplsrv_push_notify_io {
+       struct {
+               struct wreplsrv_partner *partner;
+               BOOL inform;
+               BOOL propagate;
+       } in;
+};