r17516: Change helper function names to make more clear what they are meant to do
[jelmer/samba4-debian.git] / source / wrepl_server / wrepl_server.c
index e2da68d2b49d2ed3c8cd3321b8fb3368e2fc9312..d18f7e8517af76cc2b9bfb69efc4b0b6487d0612 100644 (file)
 
 #include "includes.h"
 #include "dlinklist.h"
-#include "lib/events/events.h"
-#include "lib/socket/socket.h"
 #include "smbd/service_task.h"
-#include "smbd/service_stream.h"
+#include "smbd/service.h"
 #include "lib/messaging/irpc.h"
-#include "librpc/gen_ndr/ndr_winsrepl.h"
+#include "librpc/gen_ndr/winsrepl.h"
 #include "wrepl_server/wrepl_server.h"
 #include "nbt_server/wins/winsdb.h"
 #include "ldb/include/ldb.h"
+#include "ldb/include/ldb_errors.h"
+#include "auth/auth.h"
+#include "db_wrap.h"
 
-void wreplsrv_terminate_connection(struct wreplsrv_in_connection *wreplconn, const char *reason)
+static struct ldb_context *wins_config_db_connect(TALLOC_CTX *mem_ctx)
 {
-       stream_terminate_connection(wreplconn->conn, reason);
+       return ldb_wrap_connect(mem_ctx, private_path(mem_ctx, lp_wins_config_url()),
+                               system_session(mem_ctx), NULL, 0, NULL);
 }
 
-static struct wreplsrv_partner *wreplsrv_find_partner(struct wreplsrv_service *service, const char *peer_addr)
+static uint64_t wins_config_db_get_seqnumber(struct ldb_context *ldb)
 {
-       struct wreplsrv_partner *cur;
-
-       for (cur = service->partners; cur; cur = cur->next) {
-               if (strcmp(cur->address, peer_addr) == 0) {
-                       return cur;
-               }
-       }
-
-       return NULL;
-}
-
-/*
-  called when we get a new connection
-*/
-static void wreplsrv_accept(struct stream_connection *conn)
-{
-       struct wreplsrv_service *service = talloc_get_type(conn->private, struct wreplsrv_service);
-       struct wreplsrv_in_connection *wreplconn;
-       const char *peer_ip;
+       int ret;
+       struct ldb_dn *dn;
+       struct ldb_result *res = NULL;
+       TALLOC_CTX *tmp_ctx = talloc_new(ldb);
+       uint64_t seqnumber = 0;
 
-       wreplconn = talloc_zero(conn, struct wreplsrv_in_connection);
-       if (!wreplconn) {
-               stream_terminate_connection(conn, "wreplsrv_accept: out of memory");
-               return;
-       }
+       dn = ldb_dn_explode(tmp_ctx, "@BASEINFO");
+       if (!dn) goto failed;
 
-       wreplconn->conn         = conn;
-       wreplconn->service      = service;
-       wreplconn->our_ip       = socket_get_my_addr(conn->socket, wreplconn);
-       if (!wreplconn->our_ip) {
-               wreplsrv_terminate_connection(wreplconn, "wreplsrv_accept: out of memory");
-               return;
-       }
+       /* find the record in the WINS database */
+       ret = ldb_search(ldb, dn, LDB_SCOPE_BASE, 
+                        NULL, NULL, &res);
+       if (ret != LDB_SUCCESS) goto failed;
+       talloc_steal(tmp_ctx, res);
+       if (res->count > 1) goto failed;
 
-       peer_ip = socket_get_peer_addr(conn->socket, wreplconn);
-       if (!peer_ip) {
-               wreplsrv_terminate_connection(wreplconn, "wreplsrv_accept: out of memory");
-               return;
+       if (res->count == 1) {
+               seqnumber = ldb_msg_find_attr_as_uint64(res->msgs[0], "sequenceNumber", 0);
        }
 
-       wreplconn->partner      = wreplsrv_find_partner(service, peer_ip);
-
-       conn->private = wreplconn;
-
-       irpc_add_name(conn->msg_ctx, "wreplsrv_connection");
+failed:
+       talloc_free(tmp_ctx);
+       return seqnumber;
 }
 
 /*
-  receive some data on a WREPL connection
+  open winsdb
 */
-static void wreplsrv_recv(struct stream_connection *conn, uint16_t flags)
+static NTSTATUS wreplsrv_open_winsdb(struct wreplsrv_service *service)
 {
-       struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private, struct wreplsrv_in_connection);
-       struct wreplsrv_in_call *call;
-       DATA_BLOB packet_in_blob;
-       DATA_BLOB packet_out_blob;
-       struct wrepl_wrap packet_out_wrap;
-       struct data_blob_list_item *rep;
-       NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
-       size_t nread;
-
-       /* avoid recursion, because of half async code */
-       if (wreplconn->processing) {
-               EVENT_FD_NOT_READABLE(conn->event.fde);
-               return;
-       }
-
-       if (wreplconn->partial.length == 0) {
-               wreplconn->partial = data_blob_talloc(wreplconn, NULL, 4);
-               if (wreplconn->partial.data == NULL) {
-                       status = NT_STATUS_NO_MEMORY;
-                       goto failed;
-               }
-               wreplconn->partial_read = 0;
-       }
-
-       /* read in the packet length */
-       if (wreplconn->partial_read < 4) {
-               uint32_t packet_length;
-
-               status = socket_recv(conn->socket, 
-                                    wreplconn->partial.data + wreplconn->partial_read,
-                                    4 - wreplconn->partial_read,
-                                    &nread, 0);
-               if (NT_STATUS_IS_ERR(status)) goto failed;
-               if (!NT_STATUS_IS_OK(status)) return;
-
-               wreplconn->partial_read += nread;
-               if (wreplconn->partial_read != 4) return;
-
-               packet_length = RIVAL(wreplconn->partial.data, 0) + 4;
-
-               wreplconn->partial.data = talloc_realloc(wreplconn, wreplconn->partial.data, 
-                                                        uint8_t, packet_length);
-               if (wreplconn->partial.data == NULL) {
-                       status = NT_STATUS_NO_MEMORY;
-                       goto failed;
-               }
-               wreplconn->partial.length = packet_length;
-       }
-
-       /* read in the body */
-       status = socket_recv(conn->socket, 
-                            wreplconn->partial.data + wreplconn->partial_read,
-                            wreplconn->partial.length - wreplconn->partial_read,
-                            &nread, 0);
-       if (NT_STATUS_IS_ERR(status)) goto failed;
-       if (!NT_STATUS_IS_OK(status)) return;
-
-       wreplconn->partial_read += nread;
-       if (wreplconn->partial_read != wreplconn->partial.length) return;
-
-       packet_in_blob.data = wreplconn->partial.data + 4;
-       packet_in_blob.length = wreplconn->partial.length - 4;
-
-       call = talloc_zero(wreplconn, struct wreplsrv_in_call);
-       if (!call) {
-               status = NT_STATUS_NO_MEMORY;
-               goto failed;
+       service->wins_db     = winsdb_connect(service, WINSDB_HANDLE_CALLER_WREPL);
+       if (!service->wins_db) {
+               return NT_STATUS_INTERNAL_DB_ERROR;
        }
-       call->wreplconn = wreplconn;
 
-       /* we have a full request - parse it */
-       status = ndr_pull_struct_blob(&packet_in_blob,
-                                     call, &call->req_packet,
-                                     (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet);
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(2,("Failed to parse incoming WINS-Replication packet - %s\n",
-                        nt_errstr(status)));
-               DEBUG(10,("packet length %u\n", wreplconn->partial.length));
-               NDR_PRINT_DEBUG(wrepl_packet, &call->req_packet);
-               goto failed;
+       service->config.ldb = wins_config_db_connect(service);
+       if (!service->config.ldb) {
+               return NT_STATUS_INTERNAL_DB_ERROR;
        }
 
-       /*
-        * we have parsed the request, so we can reset the wreplconn->partial_read,
-        * maybe we could also free wreplconn->partial, but for now we keep it,
-        * and overwrite it the next time
-        */
-       wreplconn->partial_read = 0;
+       /* the default renew interval is 6 days */
+       service->config.renew_interval    = lp_parm_int(-1,"wreplsrv","renew_interval", 6*24*60*60);
 
-       if (DEBUGLVL(10)) {
-               DEBUG(10,("Received WINS-Replication packet of length %u\n", wreplconn->partial.length));
-               NDR_PRINT_DEBUG(wrepl_packet, &call->req_packet);
-       }
+       /* the default tombstone (extinction) interval is 6 days */
+       service->config.tombstone_interval= lp_parm_int(-1,"wreplsrv","tombstone_interval", 6*24*60*60);
 
-       /* actually process the request */
-       wreplconn->processing = True;
-       status = wreplsrv_in_call(call);
-       wreplconn->processing = False;
-       if (NT_STATUS_IS_ERR(status)) goto failed;
-       if (!NT_STATUS_IS_OK(status)) {
-               /* w2k just ignores invalid packets, so we do */
-               DEBUG(10,("Received WINS-Replication packet was invalid, we just ignore it\n"));
-               talloc_free(call);
-               return;
-       }
+       /* the default tombstone (extinction) timeout is 1 day */
+       service->config.tombstone_timeout = lp_parm_int(-1,"wreplsrv","tombstone_timeout", 1*24*60*60);
 
-       /* and now encode the reply */
-       packet_out_wrap.packet = call->rep_packet;
-       status = ndr_push_struct_blob(&packet_out_blob, call, &packet_out_wrap,
-                                     (ndr_push_flags_fn_t)ndr_push_wrepl_wrap);
-       if (!NT_STATUS_IS_OK(status)) goto failed;
+       /* the default tombstone extra timeout is 3 days */
+       service->config.tombstone_extra_timeout = lp_parm_int(-1,"wreplsrv","tombstone_extra_timeout", 3*24*60*60);
 
-       if (DEBUGLVL(10)) {
-               DEBUG(10,("Sending WINS-Replication packet of length %d\n", (int)packet_out_blob.length));
-               NDR_PRINT_DEBUG(wrepl_packet, &call->rep_packet);
-       }
+       /* the default verify interval is 24 days */
+       service->config.verify_interval   = lp_parm_int(-1,"wreplsrv","verify_interval", 24*24*60*60);
 
-       rep = talloc(wreplconn, struct data_blob_list_item);
-       if (!rep) {
-               status = NT_STATUS_NO_MEMORY;
-               goto failed;
-       }
+       /* the default scavenging interval is 'renew_interval/2' */
+       service->config.scavenging_interval=lp_parm_int(-1,"wreplsrv","scavenging_interval",
+                                                       service->config.renew_interval/2);
 
-       rep->blob = packet_out_blob;
-       talloc_steal(rep, packet_out_blob.data);
-       /* we don't need the call anymore */
-       talloc_free(call);
+       /* the maximun interval to the next periodic processing event */
+       service->config.periodic_interval = lp_parm_int(-1,"wreplsrv","periodic_interval", 15);
 
-       if (!wreplconn->send_queue) {
-               EVENT_FD_WRITEABLE(conn->event.fde);
-       }
-       DLIST_ADD_END(wreplconn->send_queue, rep, struct data_blob_list_item *);
-
-       if (wreplconn->terminate) {
-               EVENT_FD_NOT_READABLE(conn->event.fde);
-       } else {
-               EVENT_FD_READABLE(conn->event.fde);
-       }
-       return;
-
-failed:
-       wreplsrv_terminate_connection(wreplconn, nt_errstr(status));
+       return NT_STATUS_OK;
 }
 
-/*
-  called when we can write to a connection
-*/
-static void wreplsrv_send(struct stream_connection *conn, uint16_t flags)
+struct wreplsrv_partner *wreplsrv_find_partner(struct wreplsrv_service *service, const char *peer_addr)
 {
-       struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private, struct wreplsrv_in_connection);
-       NTSTATUS status;
-
-       while (wreplconn->send_queue) {
-               struct data_blob_list_item *rep = wreplconn->send_queue;
-               size_t sendlen;
-
-               status = socket_send(conn->socket, &rep->blob, &sendlen, 0);
-               if (NT_STATUS_IS_ERR(status)) goto failed;
-               if (!NT_STATUS_IS_OK(status)) return;
-
-               rep->blob.length -= sendlen;
-               rep->blob.data   += sendlen;
+       struct wreplsrv_partner *cur;
 
-               if (rep->blob.length == 0) {
-                       DLIST_REMOVE(wreplconn->send_queue, rep);
-                       talloc_free(rep);
+       for (cur = service->partners; cur; cur = cur->next) {
+               if (strcmp(cur->address, peer_addr) == 0) {
+                       return cur;
                }
        }
 
-       if (wreplconn->terminate) {
-               wreplsrv_terminate_connection(wreplconn, "connection terminated after all pending packets are send");
-               return;
-       }
-
-       EVENT_FD_NOT_WRITEABLE(conn->event.fde);
-       return;
-
-failed:
-       wreplsrv_terminate_connection(wreplconn, nt_errstr(status));
-}
-
-static const struct stream_server_ops wreplsrv_stream_ops = {
-       .name                   = "wreplsrv",
-       .accept_connection      = wreplsrv_accept,
-       .recv_handler           = wreplsrv_recv,
-       .send_handler           = wreplsrv_send,
-};
-
-/*
-  open winsdb
-*/
-static NTSTATUS wreplsrv_open_winsdb(struct wreplsrv_service *service)
-{
-       service->wins_db     = winsdb_connect(service);
-       if (!service->wins_db) {
-               return NT_STATUS_INTERNAL_DB_ERROR;
-       }
-
-       return NT_STATUS_OK;
+       return NULL;
 }
 
 /*
   load our replication partners
 */
-static NTSTATUS wreplsrv_load_partners(struct wreplsrv_service *service)
+NTSTATUS wreplsrv_load_partners(struct wreplsrv_service *service)
 {
-       struct ldb_message **res = NULL;
+       struct wreplsrv_partner *partner;
+       struct ldb_result *res = NULL;
        int ret;
        TALLOC_CTX *tmp_ctx = talloc_new(service);
        int i;
+       uint64_t new_seqnumber;
+
+       new_seqnumber = wins_config_db_get_seqnumber(service->config.ldb);
+
+       /* if it's not the first run and nothing changed we're done */
+       if (service->config.seqnumber != 0 && service->config.seqnumber == new_seqnumber) {
+               return NT_STATUS_OK;
+       }
+
+       service->config.seqnumber = new_seqnumber;
 
        /* find the record in the WINS database */
-       ret = ldb_search(service->wins_db, ldb_dn_explode(tmp_ctx, "CN=PARTNERS"), LDB_SCOPE_ONELEVEL,
+       ret = ldb_search(service->config.ldb, ldb_dn_explode(tmp_ctx, "CN=PARTNERS"), LDB_SCOPE_SUBTREE,
                         "(objectClass=wreplPartner)", NULL, &res);
-       if (res != NULL) {
-               talloc_steal(tmp_ctx, res);
+       if (ret != LDB_SUCCESS) goto failed;
+       talloc_steal(tmp_ctx, res);
+
+       /* first disable all existing partners */
+       for (partner=service->partners; partner; partner = partner->next) {
+               partner->type = WINSREPL_PARTNER_NONE;
        }
-       if (ret < 0) goto failed;
-       if (ret == 0) goto done;
 
-       for (i=0; i < ret; i++) {
-               struct wreplsrv_partner *partner;
+       for (i=0; i < res->count; i++) {
+               const char *address;
 
-               partner = talloc(service, struct wreplsrv_partner);
-               if (partner == NULL) goto failed;
+               address = ldb_msg_find_attr_as_string(res->msgs[i], "address", NULL);
+               if (!address) {
+                       goto failed;
+               }
 
-               partner->address        = ldb_msg_find_string(res[i], "address", NULL);
-               if (!partner->address) goto failed;
-               partner->name           = ldb_msg_find_string(res[i], "name", partner->address);
-               partner->type           = ldb_msg_find_int(res[i], "type", WINSREPL_PARTNER_BOTH);
-               partner->pull.interval  = ldb_msg_find_int(res[i], "pullInterval", WINSREPL_DEFAULT_PULL_INTERVAL);
-               partner->our_address    = ldb_msg_find_string(res[i], "ourAddress", NULL);
+               partner = wreplsrv_find_partner(service, address);
+               if (partner) {
+                       if (partner->name != partner->address) {
+                               talloc_free(discard_const(partner->name));
+                       }
+                       partner->name = NULL;
+                       talloc_free(discard_const(partner->our_address));
+                       partner->our_address = NULL;
+
+                       /* force rescheduling of pulling */
+                       partner->pull.next_run = timeval_zero();
+               } else {
+                       partner = talloc_zero(service, struct wreplsrv_partner);
+                       if (partner == NULL) goto failed;
+
+                       partner->service = service;
+                       partner->address = address;
+                       talloc_steal(partner, partner->address);
+
+                       DLIST_ADD_END(service->partners, partner, struct wreplsrv_partner *);
+               }
 
-               talloc_steal(partner, partner->address);
+               partner->name                   = ldb_msg_find_attr_as_string(res->msgs[i], "name", partner->address);
                talloc_steal(partner, partner->name);
+               partner->our_address            = ldb_msg_find_attr_as_string(res->msgs[i], "ourAddress", NULL);
                talloc_steal(partner, partner->our_address);
 
-               DLIST_ADD(service->partners, partner);
+               partner->type                   = ldb_msg_find_attr_as_uint(res->msgs[i], "type", WINSREPL_PARTNER_BOTH);
+               partner->pull.interval          = ldb_msg_find_attr_as_uint(res->msgs[i], "pullInterval",
+                                                                   WINSREPL_DEFAULT_PULL_INTERVAL);
+               partner->pull.retry_interval    = ldb_msg_find_attr_as_uint(res->msgs[i], "pullRetryInterval",
+                                                                   WINSREPL_DEFAULT_PULL_RETRY_INTERVAL);
+               partner->push.change_count      = ldb_msg_find_attr_as_uint(res->msgs[i], "pushChangeCount",
+                                                                   WINSREPL_DEFAULT_PUSH_CHANGE_COUNT);
+               partner->push.use_inform        = ldb_msg_find_attr_as_uint(res->msgs[i], "pushUseInform", False);
+
+               DEBUG(3,("wreplsrv_load_partners: found partner: %s type: 0x%X\n",
+                       partner->address, partner->type));
        }
-done:
+
+       DEBUG(2,("wreplsrv_load_partners: %u partners found: wins_config_db seqnumber %llu\n",
+               res->count, service->config.seqnumber));
+
        talloc_free(tmp_ctx);
        return NT_STATUS_OK;
 failed:
@@ -338,42 +209,74 @@ failed:
        return NT_STATUS_FOOBAR;
 }
 
-uint64_t wreplsrv_local_max_version(struct wreplsrv_service *service)
+NTSTATUS wreplsrv_fill_wrepl_table(struct wreplsrv_service *service,
+                                  TALLOC_CTX *mem_ctx,
+                                  struct wrepl_table *table_out,
+                                  const char *initiator,
+                                  BOOL full_table)
 {
-       int ret;
-       struct ldb_context *ldb = service->wins_db;
-       struct ldb_dn *dn;
-       struct ldb_message **res = NULL;
-       TALLOC_CTX *tmp_ctx = talloc_new(service);
-       uint64_t maxVersion = 0;
+       struct wreplsrv_owner *cur;
+       uint32_t i = 0;
 
-       dn = ldb_dn_explode(tmp_ctx, "CN=VERSION");
-       if (!dn) goto failed;
+       table_out->partner_count        = 0;
+       table_out->partners             = NULL;
+       table_out->initiator            = initiator;
 
-       /* find the record in the WINS database */
-       ret = ldb_search(ldb, dn, LDB_SCOPE_BASE, 
-                        NULL, NULL, &res);
-       if (res != NULL) {
-               talloc_steal(tmp_ctx, res);
+       for (cur = service->table; cur; cur = cur->next) {
+               if (full_table) {
+                       table_out->partner_count++;
+                       continue;
+               }
+
+               if (strcmp(initiator, cur->owner.address) != 0) continue;
+
+               table_out->partner_count++;
+               break;
        }
-       if (ret < 0) goto failed;
-       if (ret > 1) goto failed;
 
-       if (ret == 1) {
-               maxVersion = ldb_msg_find_uint64(res[0], "maxVersion", 0);
+       table_out->partners = talloc_array(mem_ctx, struct wrepl_wins_owner, table_out->partner_count);
+       NT_STATUS_HAVE_NO_MEMORY(table_out->partners);
+
+       for (cur = service->table; cur && i < table_out->partner_count; cur = cur->next) {
+               /*
+                * if it's our local entry
+                * update the max version
+                */
+               if (cur == service->owner) {
+                       cur->owner.max_version = winsdb_get_maxVersion(service->wins_db);
+               }
+
+               if (full_table) {
+                       table_out->partners[i] = cur->owner;
+                       i++;
+                       continue;
+               }
+
+               if (strcmp(initiator, cur->owner.address) != 0) continue;
+
+               table_out->partners[i] = cur->owner;
+               i++;
+               break;
        }
 
-failed:
-       talloc_free(tmp_ctx);
-       return maxVersion;
+       return NT_STATUS_OK;
 }
 
-struct wreplsrv_owner *wreplsrv_find_owner(struct wreplsrv_owner *table, const char *wins_owner)
+struct wreplsrv_owner *wreplsrv_find_owner(struct wreplsrv_service *service,
+                                          struct wreplsrv_owner *table,
+                                          const char *wins_owner)
 {
        struct wreplsrv_owner *cur;
 
        for (cur = table; cur; cur = cur->next) {
                if (strcmp(cur->owner.address, wins_owner) == 0) {
+                       /*
+                        * if it's our local entry
+                        * update the max version
+                        */
+                       if (cur == service->owner) {
+                               cur->owner.max_version = winsdb_get_maxVersion(service->wins_db);
+                       }
                        return cur;
                }
        }
@@ -385,18 +288,18 @@ struct wreplsrv_owner *wreplsrv_find_owner(struct wreplsrv_owner *table, const c
  update the wins_owner_table max_version, if the given version is the highest version
  if no entry for the wins_owner exists yet, create one
 */
-static NTSTATUS wreplsrv_add_table(struct wreplsrv_service *service,
-                                  TALLOC_CTX *mem_ctx, struct wreplsrv_owner **_table,
-                                  const char *wins_owner, uint64_t version)
+NTSTATUS wreplsrv_add_table(struct wreplsrv_service *service,
+                           TALLOC_CTX *mem_ctx, struct wreplsrv_owner **_table,
+                           const char *wins_owner, uint64_t version)
 {
        struct wreplsrv_owner *table = *_table;
        struct wreplsrv_owner *cur;
 
-       if (strcmp(WINSDB_OWNER_LOCAL, wins_owner) == 0) {
-               return NT_STATUS_OK;
+       if (!wins_owner || strcmp(wins_owner, "0.0.0.0") == 0) {
+               wins_owner = service->wins_db->local_owner;
        }
 
-       cur = wreplsrv_find_owner(table, wins_owner);
+       cur = wreplsrv_find_owner(service, table, wins_owner);
 
        /* if it doesn't exists yet, create one */
        if (!cur) {
@@ -411,15 +314,25 @@ static NTSTATUS wreplsrv_add_table(struct wreplsrv_service *service,
 
                cur->partner            = wreplsrv_find_partner(service, wins_owner);
 
-               DLIST_ADD(table, cur);
+               DLIST_ADD_END(table, cur, struct wreplsrv_owner *);
                *_table = table;
        }
 
        /* the min_version is always 0 here, and won't be updated */
 
-       /* if the given version is higher the then current nax_version, update */
+       /* if the given version is higher than the current max_version, update */
        if (cur->owner.max_version < version) {
                cur->owner.max_version = version;
+               /* if it's for our local db, we need to update the wins.ldb too */
+               if (cur == service->owner) {
+                       uint64_t ret;
+                       ret = winsdb_set_maxVersion(service->wins_db, cur->owner.max_version);
+                       if (ret != cur->owner.max_version) {
+                               DEBUG(0,("winsdb_set_maxVersion(%llu) failed: %llu\n",
+                                       cur->owner.max_version, ret));
+                               return NT_STATUS_INTERNAL_DB_CORRUPTION;
+                       }
+               }
        }
 
        return NT_STATUS_OK;
@@ -430,44 +343,69 @@ static NTSTATUS wreplsrv_add_table(struct wreplsrv_service *service,
 */
 static NTSTATUS wreplsrv_load_table(struct wreplsrv_service *service)
 {
-       struct ldb_message **res = NULL;
+       struct ldb_result *res = NULL;
        int ret;
        NTSTATUS status;
        TALLOC_CTX *tmp_ctx = talloc_new(service);
+       struct ldb_context *ldb = service->wins_db->ldb;
        int i;
+       struct wreplsrv_owner *local_owner;
        const char *wins_owner;
        uint64_t version;
        const char * const attrs[] = {
                "winsOwner",
-               "version",
+               "versionID",
                NULL
        };
 
-       /* find the record in the WINS database */
-       ret = ldb_search(service->wins_db, NULL, LDB_SCOPE_SUBTREE,
-                        "(objectClass=wins)", attrs, &res);
-       if (res != NULL) {
-               talloc_steal(tmp_ctx, res);
+       /*
+        * make sure we have our local entry in the list,
+        * but we set service->owner when we're done
+        * to avoid to many calls to wreplsrv_local_max_version()
+        */
+       status = wreplsrv_add_table(service,
+                                   service, &service->table,
+                                   service->wins_db->local_owner, 0);
+       if (!NT_STATUS_IS_OK(status)) goto failed;
+       local_owner = wreplsrv_find_owner(service, service->table, service->wins_db->local_owner);
+       if (!local_owner) {
+               status = NT_STATUS_INTERNAL_ERROR;
+               goto failed;
        }
+
+       /* find the record in the WINS database */
+       ret = ldb_search(ldb, NULL, LDB_SCOPE_SUBTREE,
+                        "(objectClass=winsRecord)", attrs, &res);
        status = NT_STATUS_INTERNAL_DB_CORRUPTION;
-       if (ret < 0) goto failed;
-       if (ret == 0) goto done;
-
-       for (i=0; i < ret; i++) {
-               wins_owner     = ldb_msg_find_string(res[i], "winsOwner", NULL);
-               version        = ldb_msg_find_uint64(res[i], "version", 0);
-
-               if (wins_owner) { 
-                       status = wreplsrv_add_table(service,
-                                                   service, &service->table,
-                                                   wins_owner, version);
-                       if (!NT_STATUS_IS_OK(status)) goto failed;
-               }
-               talloc_free(res[i]);
+       if (ret != LDB_SUCCESS) goto failed;
+       talloc_steal(tmp_ctx, res);
+
+       for (i=0; i < res->count; i++) {
+               wins_owner     = ldb_msg_find_attr_as_string(res->msgs[i], "winsOwner", NULL);
+               version        = ldb_msg_find_attr_as_uint64(res->msgs[i], "versionID", 0);
 
-               /* TODO: what's abut the per address owners? */
+               status = wreplsrv_add_table(service,
+                                           service, &service->table,
+                                           wins_owner, version);
+               if (!NT_STATUS_IS_OK(status)) goto failed;
+               talloc_free(res->msgs[i]);
        }
-done:
+
+       /*
+        * this makes sure we call wreplsrv_local_max_version() before returning in
+        * wreplsrv_find_owner()
+        */
+       service->owner = local_owner;
+
+       /*
+        * this makes sure the maxVersion in the database is updated,
+        * with the highest version we found, if this is higher than the current stored one
+        */
+       status = wreplsrv_add_table(service,
+                                   service, &service->table,
+                                   service->wins_db->local_owner, local_owner->owner.max_version);
+       if (!NT_STATUS_IS_OK(status)) goto failed;
+
        talloc_free(tmp_ctx);
        return NT_STATUS_OK;
 failed:
@@ -491,58 +429,6 @@ static NTSTATUS wreplsrv_setup_partners(struct wreplsrv_service *service)
        return NT_STATUS_OK;
 }
 
-/*
-  startup the wrepl port 42 server sockets
-*/
-static NTSTATUS wreplsrv_setup_sockets(struct wreplsrv_service *service)
-{
-       NTSTATUS status;
-       struct task_server *task = service->task;
-       const struct model_ops *model_ops;
-       const char *address;
-       uint16_t port = WINS_REPLICATION_PORT;
-
-       /* within the wrepl task we want to be a single process, so
-          ask for the single process model ops and pass these to the
-          stream_setup_socket() call. */
-       model_ops = process_model_byname("single");
-       if (!model_ops) {
-               DEBUG(0,("Can't find 'single' process model_ops"));
-               return NT_STATUS_INTERNAL_ERROR;
-       }
-
-       if (lp_interfaces() && lp_bind_interfaces_only()) {
-               int num_interfaces = iface_count();
-               int i;
-
-               /* We have been given an interfaces line, and been 
-                  told to only bind to those interfaces. Create a
-                  socket per interface and bind to only these.
-               */
-               for(i = 0; i < num_interfaces; i++) {
-                       address = iface_n_ip(i);
-                       status = stream_setup_socket(task->event_ctx, model_ops, &wreplsrv_stream_ops,
-                                                    "ipv4", address, &port, service);
-                       if (!NT_STATUS_IS_OK(status)) {
-                               DEBUG(0,("stream_setup_socket(address=%s,port=%u) failed - %s\n",
-                                        address, port, nt_errstr(status)));
-                               return status;
-                       }
-               }
-       } else {
-               address = lp_socket_address();
-               status = stream_setup_socket(task->event_ctx, model_ops, &wreplsrv_stream_ops,
-                                            "ipv4", address, &port, service);
-               if (!NT_STATUS_IS_OK(status)) {
-                       DEBUG(0,("stream_setup_socket(address=%s,port=%u) failed - %s\n",
-                                address, port, nt_errstr(status)));
-                       return status;
-               }
-       }
-
-       return NT_STATUS_OK;
-}
-
 /*
   startup the wrepl task
 */
@@ -551,13 +437,16 @@ static void wreplsrv_task_init(struct task_server *task)
        NTSTATUS status;
        struct wreplsrv_service *service;
 
+       task_server_set_title(task, "task[wreplsrv]");
+
        service = talloc_zero(task, struct wreplsrv_service);
        if (!service) {
                task_server_terminate(task, "wreplsrv_task_init: out of memory");
                return;
        }
-       service->task = task;
-       task->private = service;
+       service->task           = task;
+       service->startup_time   = timeval_current();
+       task->private           = service;
 
        /*
         * setup up all partners, and open the winsdb
@@ -587,6 +476,12 @@ static void wreplsrv_task_init(struct task_server *task)
                return;
        }
 
+       status = wreplsrv_setup_periodic(service);
+       if (!NT_STATUS_IS_OK(status)) {
+               task_server_terminate(task, "wreplsrv_task_init: wreplsrv_setup_periodic() failed");
+               return;
+       }
+
        irpc_add_name(task->msg_ctx, "wrepl_server");
 }
 
@@ -595,6 +490,10 @@ static void wreplsrv_task_init(struct task_server *task)
  */
 static NTSTATUS wreplsrv_init(struct event_context *event_ctx, const struct model_ops *model_ops)
 {
+       if (!lp_wins_support()) {
+               return NT_STATUS_OK;
+       }
+
        return task_server_startup(event_ctx, model_ops, wreplsrv_task_init);
 }