wrepl_server: make 'use inform' the default and autofallback against old servers
[kai/samba.git] / source4 / wrepl_server / wrepl_in_call.c
index b3406c96bacf8182976411834cee0a8961d7d451..a4f18ff2da44579a882031adc5b92646b69f7aab 100644 (file)
@@ -7,7 +7,7 @@
    
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
+   the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.
    
    This program is distributed in the hope that it will be useful,
    GNU General Public License for more details.
    
    You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 #include "includes.h"
-#include "dlinklist.h"
 #include "lib/events/events.h"
 #include "lib/socket/socket.h"
-#include "smbd/service_task.h"
 #include "smbd/service_stream.h"
-#include "lib/messaging/irpc.h"
-#include "librpc/gen_ndr/ndr_winsrepl.h"
-#include "librpc/gen_ndr/ndr_nbt.h"
 #include "libcli/wrepl/winsrepl.h"
 #include "wrepl_server/wrepl_server.h"
-#include "wrepl_server/wrepl_out_helpers.h"
 #include "libcli/composite/composite.h"
 #include "nbt_server/wins/winsdb.h"
 #include "lib/ldb/include/ldb.h"
 #include "lib/ldb/include/ldb_errors.h"
+#include "system/time.h"
 
 static NTSTATUS wreplsrv_in_start_association(struct wreplsrv_in_call *call)
 {
@@ -55,12 +49,19 @@ static NTSTATUS wreplsrv_in_start_association(struct wreplsrv_in_call *call)
                return NT_STATUS_OK;
        }
 
+/*
+ * it seems that we don't know all details about the start_association
+ * to support replication with NT4 (it sends 1.1 instead of 5.2)
+ * we ignore the version numbers until we know all details
+ */
+#if 0
        if (start->minor_version != 2 || start->major_version != 5) {
                /* w2k terminate the connection if the versions doesn't match */
                return NT_STATUS_UNKNOWN_REVISION;
        }
+#endif
 
-       call->wreplconn->assoc_ctx.stopped      = False;
+       call->wreplconn->assoc_ctx.stopped      = false;
        call->wreplconn->assoc_ctx.our_ctx      = WREPLSRV_VALID_ASSOC_CTX;
        call->wreplconn->assoc_ctx.peer_ctx     = start->assoc_ctx;
 
@@ -69,6 +70,19 @@ static NTSTATUS wreplsrv_in_start_association(struct wreplsrv_in_call *call)
        start_reply->minor_version              = 2;
        start_reply->major_version              = 5;
 
+       /*
+        * nt4 uses 41 bytes for the start_association call
+        * so do it the same and as we don't know the meanings of this bytes
+        * we just send zeros and nt4, w2k and w2k3 seems to be happy with this
+        *
+        * if we don't do this nt4 uses an old version of the wins replication protocol
+        * and that would break nt4 <-> samba replication
+        */
+       call->rep_packet.padding                = data_blob_talloc(call, NULL, 21);
+       NT_STATUS_HAVE_NO_MEMORY(call->rep_packet.padding.data);
+
+       memset(call->rep_packet.padding.data, 0, call->rep_packet.padding.length);
+
        return NT_STATUS_OK;
 }
 
@@ -76,7 +90,7 @@ static NTSTATUS wreplsrv_in_stop_assoc_ctx(struct wreplsrv_in_call *call)
 {
        struct wrepl_stop *stop_out             = &call->rep_packet.message.stop;
 
-       call->wreplconn->assoc_ctx.stopped      = True;
+       call->wreplconn->assoc_ctx.stopped      = true;
 
        call->rep_packet.mess_type              = WREPL_STOP_ASSOCIATION;
        stop_out->reason                        = 4;
@@ -106,7 +120,7 @@ static NTSTATUS wreplsrv_in_stop_association(struct wreplsrv_in_call *call)
        }
 
        /* this will cause to not receive packets anymore and terminate the connection if the reply is send */
-       call->wreplconn->terminate = True;
+       call->terminate_after_send = true;
        return wreplsrv_in_stop_assoc_ctx(call);
 }
 
@@ -115,12 +129,11 @@ static NTSTATUS wreplsrv_in_table_query(struct wreplsrv_in_call *call)
        struct wreplsrv_service *service = call->wreplconn->service;
        struct wrepl_replication *repl_out = &call->rep_packet.message.replication;
        struct wrepl_table *table_out = &call->rep_packet.message.replication.info.table;
-       const char *our_ip = call->wreplconn->our_ip;
 
        repl_out->command = WREPL_REPL_TABLE_REPLY;
 
        return wreplsrv_fill_wrepl_table(service, call, table_out,
-                                        our_ip, our_ip, True);
+                                        service->wins_db->local_owner, true);
 }
 
 static int wreplsrv_in_sort_wins_name(struct wrepl_wins_name *n1,
@@ -132,7 +145,6 @@ static int wreplsrv_in_sort_wins_name(struct wrepl_wins_name *n1,
 }
 
 static NTSTATUS wreplsrv_record2wins_name(TALLOC_CTX *mem_ctx,
-                                         const char *our_address,
                                          struct wrepl_wins_name *name,
                                          struct winsdb_record *rec)
 {
@@ -143,7 +155,7 @@ static NTSTATUS wreplsrv_record2wins_name(TALLOC_CTX *mem_ctx,
        talloc_steal(mem_ctx, rec->name);
 
        name->id                = rec->version;
-       name->unknown           = WINSDB_GROUP_ADDRESS;
+       name->unknown           = "255.255.255.255";
 
        name->flags             = WREPL_NAME_FLAGS(rec->type, rec->state, rec->node, rec->is_static);
 
@@ -158,13 +170,8 @@ static NTSTATUS wreplsrv_record2wins_name(TALLOC_CTX *mem_ctx,
                NT_STATUS_HAVE_NO_MEMORY(ips);
 
                for (i = 0; i < num_ips; i++) {
-                       if (strcasecmp(WINSDB_OWNER_LOCAL, rec->addresses[i]->wins_owner) == 0) {
-                               ips[i].owner    = talloc_strdup(ips, our_address);
-                               NT_STATUS_HAVE_NO_MEMORY(ips[i].owner);
-                       } else {
-                               ips[i].owner    = rec->addresses[i]->wins_owner;
-                               talloc_steal(ips, rec->addresses[i]->wins_owner);
-                       }
+                       ips[i].owner    = rec->addresses[i]->wins_owner;
+                       talloc_steal(ips, rec->addresses[i]->wins_owner);
                        ips[i].ip       = rec->addresses[i]->address;
                        talloc_steal(ips, rec->addresses[i]->address);
                }
@@ -183,26 +190,18 @@ static NTSTATUS wreplsrv_in_send_request(struct wreplsrv_in_call *call)
        struct wrepl_wins_owner *owner_in = &call->req_packet.message.replication.info.owner;
        struct wrepl_replication *repl_out = &call->rep_packet.message.replication;
        struct wrepl_send_reply *reply_out = &call->rep_packet.message.replication.info.reply;
-       struct wreplsrv_owner local_owner;
        struct wreplsrv_owner *owner;
+       const char *owner_filter;
        const char *filter;
        struct ldb_result *res = NULL;
        int ret;
        struct wrepl_wins_name *names;
        struct winsdb_record *rec;
        NTSTATUS status;
-       uint32_t i;
-
-       if (strcmp(call->wreplconn->our_ip, owner_in->address) == 0) {
-               ZERO_STRUCT(local_owner);
-               local_owner.owner.address       = WINSDB_OWNER_LOCAL;
-               local_owner.owner.min_version   = 0;
-               local_owner.owner.max_version   = wreplsrv_local_max_version(service);
-               local_owner.owner.type          = 1;
-               owner = &local_owner;
-       } else {
-               owner = wreplsrv_find_owner(service->table, owner_in->address);
-       }
+       uint32_t i, j;
+       time_t now = time(NULL);
+
+       owner = wreplsrv_find_owner(service, service->table, owner_in->address);
 
        repl_out->command       = WREPL_REPL_SEND_REPLY;
        reply_out->num_names    = 0;
@@ -213,14 +212,29 @@ static NTSTATUS wreplsrv_in_send_request(struct wreplsrv_in_call *call)
         * return an empty list.
         */
        if (!owner) {
+               DEBUG(2,("WINSREPL:reply [0] records unknown owner[%s] to partner[%s]\n",
+                       owner_in->address, call->wreplconn->partner->address));
                return NT_STATUS_OK;
        }
 
+       /*
+        * the client sends a max_version of 0, interpret it as
+        * (uint64_t)-1
+        */
+       if (owner_in->max_version == 0) {
+               owner_in->max_version = (uint64_t)-1;
+       }
+
        /*
         * if the partner ask for nothing, or give invalid ranges,
         * return an empty list.
         */
        if (owner_in->min_version > owner_in->max_version) {
+               DEBUG(2,("WINSREPL:reply [0] records owner[%s] min[%llu] max[%llu] to partner[%s]\n",
+                       owner_in->address, 
+                       (long long)owner_in->min_version, 
+                       (long long)owner_in->max_version,
+                       call->wreplconn->partner->address));
                return NT_STATUS_OK;
        }
 
@@ -229,23 +243,34 @@ static NTSTATUS wreplsrv_in_send_request(struct wreplsrv_in_call *call)
         * return an empty list.
         */
        if (owner_in->min_version > owner->owner.max_version) {
+               DEBUG(2,("WINSREPL:reply [0] records owner[%s] min[%llu] max[%llu] to partner[%s]\n",
+                       owner_in->address, 
+                       (long long)owner_in->min_version, 
+                       (long long)owner_in->max_version,
+                       call->wreplconn->partner->address));
                return NT_STATUS_OK;
        }
 
+       owner_filter = wreplsrv_owner_filter(service, call, owner->owner.address);
+       NT_STATUS_HAVE_NO_MEMORY(owner_filter);
        filter = talloc_asprintf(call,
-                                "(&(winsOwner=%s)(objectClass=winsRecord)"
+                                "(&%s(objectClass=winsRecord)"
                                 "(|(recordState=%u)(recordState=%u))"
                                 "(versionID>=%llu)(versionID<=%llu))",
-                                owner->owner.address,
+                                owner_filter,
                                 WREPL_STATE_ACTIVE, WREPL_STATE_TOMBSTONE,
-                                owner_in->min_version, owner_in->max_version);
+                                (long long)owner_in->min_version, 
+                                (long long)owner_in->max_version);
        NT_STATUS_HAVE_NO_MEMORY(filter);
-       ret = ldb_search(service->wins_db, NULL, LDB_SCOPE_SUBTREE, filter, NULL, &res);
+       ret = ldb_search(service->wins_db->ldb, call, &res, NULL, LDB_SCOPE_SUBTREE, NULL, "%s", filter);
        if (ret != LDB_SUCCESS) return NT_STATUS_INTERNAL_DB_CORRUPTION;
-       talloc_steal(call, res);
+       DEBUG(10,("WINSREPL: filter '%s' count %d\n", filter, res->count));
+
        if (res->count == 0) {
                DEBUG(2,("WINSREPL:reply [%u] records owner[%s] min[%llu] max[%llu] to partner[%s]\n",
-                       res->count, owner_in->address, owner_in->min_version, owner_in->max_version,
+                       res->count, owner_in->address, 
+                       (long long)owner_in->min_version, 
+                       (long long)owner_in->max_version,
                        call->wreplconn->partner->address));
                return NT_STATUS_OK;
        }
@@ -253,24 +278,37 @@ static NTSTATUS wreplsrv_in_send_request(struct wreplsrv_in_call *call)
        names = talloc_array(call, struct wrepl_wins_name, res->count);
        NT_STATUS_HAVE_NO_MEMORY(names);
 
-       for (i = 0; i < res->count; i++) {
-               status = winsdb_record(res->msgs[i], NULL, call, &rec);
+       for (i=0, j=0; i < res->count; i++) {
+               status = winsdb_record(service->wins_db, res->msgs[i], call, now, &rec);
                NT_STATUS_NOT_OK_RETURN(status);
 
-               status = wreplsrv_record2wins_name(names, call->wreplconn->our_ip, &names[i], rec);
-               NT_STATUS_NOT_OK_RETURN(status);
+               /*
+                * it's possible that winsdb_record() made the record RELEASED
+                * because it's expired, but in the database it's still stored
+                * as ACTIVE...
+                *
+                * make sure we really only replicate ACTIVE and TOMBSTONE records
+                */
+               if (rec->state == WREPL_STATE_ACTIVE || rec->state == WREPL_STATE_TOMBSTONE) {
+                       status = wreplsrv_record2wins_name(names, &names[j], rec);
+                       NT_STATUS_NOT_OK_RETURN(status);
+                       j++;
+               }
+
                talloc_free(rec);
                talloc_free(res->msgs[i]);
        }
 
        /* sort the names before we send them */
-       qsort(names, res->count, sizeof(struct wrepl_wins_name), (comparison_fn_t)wreplsrv_in_sort_wins_name);
+       qsort(names, j, sizeof(struct wrepl_wins_name), (comparison_fn_t)wreplsrv_in_sort_wins_name);
 
        DEBUG(2,("WINSREPL:reply [%u] records owner[%s] min[%llu] max[%llu] to partner[%s]\n",
-               res->count, owner_in->address, owner_in->min_version, owner_in->max_version,
+               j, owner_in->address, 
+               (long long)owner_in->min_version, 
+               (long long)owner_in->max_version,
                call->wreplconn->partner->address));
 
-       reply_out->num_names    = res->count;
+       reply_out->num_names    = j;
        reply_out->names        = names;
 
        return NT_STATUS_OK;
@@ -302,6 +340,7 @@ static NTSTATUS wreplsrv_in_update(struct wreplsrv_in_call *call)
        struct wreplsrv_out_connection *wrepl_out;
        struct wrepl_table *update_in = &call->req_packet.message.replication.info.table;
        struct wreplsrv_in_update_state *update_state;
+       uint16_t fde_flags;
 
        DEBUG(2,("WREPL_REPL_UPDATE: partner[%s] initiator[%s] num_owners[%u]\n",
                call->wreplconn->partner->address,
@@ -312,6 +351,7 @@ static NTSTATUS wreplsrv_in_update(struct wreplsrv_in_call *call)
         * and do a WREPL_REPL_SEND_REQUEST's on the that connection
         * and then stop this connection
         */
+       fde_flags = event_get_fd_flags(wrepl_in->conn->event.fde);
        talloc_free(wrepl_in->conn->event.fde);
        wrepl_in->conn->event.fde = NULL;
 
@@ -326,9 +366,12 @@ static NTSTATUS wreplsrv_in_update(struct wreplsrv_in_call *call)
        wrepl_out->assoc_ctx.peer_ctx   = wrepl_in->assoc_ctx.peer_ctx;
        wrepl_out->sock                 = wrepl_socket_merge(wrepl_out,
                                                             wrepl_in->conn->event.ctx,
-                                                            wrepl_in->conn->socket);
+                                                            wrepl_in->conn->socket,
+                                                            wrepl_in->packet);
        NT_STATUS_HAVE_NO_MEMORY(wrepl_out->sock);
 
+       event_set_fd_flags(wrepl_out->sock->event.fde, fde_flags);
+
        update_state->wrepl_in                  = wrepl_in;
        update_state->wrepl_out                 = wrepl_out;
        update_state->cycle_io.in.partner       = wrepl_out->partner;
@@ -355,14 +398,12 @@ static NTSTATUS wreplsrv_in_update2(struct wreplsrv_in_call *call)
 static NTSTATUS wreplsrv_in_inform(struct wreplsrv_in_call *call)
 {
        struct wrepl_table *inform_in = &call->req_packet.message.replication.info.table;
-       NTSTATUS status;
 
        DEBUG(2,("WREPL_REPL_INFORM: partner[%s] initiator[%s] num_owners[%u]\n",
                call->wreplconn->partner->address,
                inform_in->initiator, inform_in->partner_count));
 
-       status = wreplsrv_sched_inform_action(call->wreplconn->partner, inform_in);
-       NT_STATUS_NOT_OK_RETURN(status);
+       wreplsrv_out_partner_pull(call->wreplconn->partner, inform_in);
 
        /* we don't reply to WREPL_REPL_INFORM messages */
        return ERROR_INVALID_PARAMETER;
@@ -391,12 +432,21 @@ static NTSTATUS wreplsrv_in_replication(struct wreplsrv_in_call *call)
        }
 
        if (!call->wreplconn->partner) {
-               return wreplsrv_in_stop_assoc_ctx(call);
+               struct socket_address *partner_ip = socket_get_peer_addr(call->wreplconn->conn->socket, call);
+
+               call->wreplconn->partner = wreplsrv_find_partner(call->wreplconn->service, partner_ip->addr);
+               if (!call->wreplconn->partner) {
+                       DEBUG(1,("Failing WINS replication from non-partner %s\n",
+                                partner_ip ? partner_ip->addr : NULL));
+                       return wreplsrv_in_stop_assoc_ctx(call);
+               }
        }
 
        switch (repl_in->command) {
                case WREPL_REPL_TABLE_QUERY:
                        if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PUSH)) {
+                               DEBUG(0,("Failing WINS replication TABLE_QUERY from non-push-partner %s\n",
+                                        call->wreplconn->partner->address));
                                return wreplsrv_in_stop_assoc_ctx(call);
                        }
                        status = wreplsrv_in_table_query(call);
@@ -407,6 +457,8 @@ static NTSTATUS wreplsrv_in_replication(struct wreplsrv_in_call *call)
 
                case WREPL_REPL_SEND_REQUEST:
                        if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PUSH)) {
+                               DEBUG(0,("Failing WINS replication SEND_REQUESET from non-push-partner %s\n",
+                                        call->wreplconn->partner->address));
                                return wreplsrv_in_stop_assoc_ctx(call);
                        }
                        status = wreplsrv_in_send_request(call);
@@ -417,6 +469,8 @@ static NTSTATUS wreplsrv_in_replication(struct wreplsrv_in_call *call)
        
                case WREPL_REPL_UPDATE:
                        if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) {
+                               DEBUG(0,("Failing WINS replication UPDATE from non-pull-partner %s\n",
+                                        call->wreplconn->partner->address));
                                return wreplsrv_in_stop_assoc_ctx(call);
                        }
                        status = wreplsrv_in_update(call);
@@ -424,6 +478,8 @@ static NTSTATUS wreplsrv_in_replication(struct wreplsrv_in_call *call)
 
                case WREPL_REPL_UPDATE2:
                        if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) {
+                               DEBUG(0,("Failing WINS replication UPDATE2 from non-pull-partner %s\n",
+                                        call->wreplconn->partner->address));
                                return wreplsrv_in_stop_assoc_ctx(call);
                        }
                        status = wreplsrv_in_update2(call);
@@ -431,6 +487,8 @@ static NTSTATUS wreplsrv_in_replication(struct wreplsrv_in_call *call)
 
                case WREPL_REPL_INFORM:
                        if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) {
+                               DEBUG(0,("Failing WINS replication INFORM from non-pull-partner %s\n",
+                                        call->wreplconn->partner->address));
                                return wreplsrv_in_stop_assoc_ctx(call);
                        }
                        status = wreplsrv_in_inform(call);
@@ -438,6 +496,8 @@ static NTSTATUS wreplsrv_in_replication(struct wreplsrv_in_call *call)
 
                case WREPL_REPL_INFORM2:
                        if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) {
+                               DEBUG(0,("Failing WINS replication INFORM2 from non-pull-partner %s\n",
+                                        call->wreplconn->partner->address));
                                return wreplsrv_in_stop_assoc_ctx(call);
                        }
                        status = wreplsrv_in_inform2(call);