wreplsrv: log a successful replication cycle at level 1
[kai/samba.git] / source4 / wrepl_server / wrepl_out_helpers.c
index 36d189a4d00a6e86201487f0137f4f26b09ccb3a..e1e3f38b12e5a1e0463d6a7a1c1496be9964234f 100644 (file)
@@ -7,7 +7,7 @@
    
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
+   the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.
    
    This program is distributed in the hope that it will be useful,
    GNU General Public License for more details.
    
    You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 #include "includes.h"
-#include "dlinklist.h"
 #include "lib/events/events.h"
 #include "lib/socket/socket.h"
 #include "smbd/service_task.h"
 #include "smbd/service_stream.h"
-#include "lib/messaging/irpc.h"
-#include "librpc/gen_ndr/ndr_winsrepl.h"
+#include "librpc/gen_ndr/winsrepl.h"
 #include "wrepl_server/wrepl_server.h"
-#include "wrepl_server/wrepl_out_helpers.h"
 #include "nbt_server/wins/winsdb.h"
-#include "ldb/include/ldb.h"
 #include "libcli/composite/composite.h"
 #include "libcli/wrepl/winsrepl.h"
+#include "libcli/resolve/resolve.h"
+#include "param/param.h"
 
 enum wreplsrv_out_connect_stage {
        WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET,
@@ -45,24 +42,26 @@ struct wreplsrv_out_connect_state {
        enum wreplsrv_out_connect_stage stage;
        struct composite_context *c;
        struct wrepl_request *req;
+       struct composite_context *c_req;
        struct wrepl_associate assoc_io;
        enum winsrepl_partner_type type;
        struct wreplsrv_out_connection *wreplconn;
 };
 
-static void wreplsrv_out_connect_handler(struct wrepl_request *req);
+static void wreplsrv_out_connect_handler_creq(struct composite_context *c_req);
+static void wreplsrv_out_connect_handler_req(struct wrepl_request *req);
 
 static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state)
 {
        NTSTATUS status;
 
-       status = wrepl_connect_recv(state->req);
+       status = wrepl_connect_recv(state->c_req);
        NT_STATUS_NOT_OK_RETURN(status);
 
        state->req = wrepl_associate_send(state->wreplconn->sock, &state->assoc_io);
        NT_STATUS_HAVE_NO_MEMORY(state->req);
 
-       state->req->async.fn            = wreplsrv_out_connect_handler;
+       state->req->async.fn            = wreplsrv_out_connect_handler_req;
        state->req->async.private       = state;
 
        state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX;
@@ -92,10 +91,8 @@ static NTSTATUS wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_
        return NT_STATUS_OK;
 }
 
-static void wreplsrv_out_connect_handler(struct wrepl_request *req)
+static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state *state)
 {
-       struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private,
-                                                  struct wreplsrv_out_connect_state);
        struct composite_context *c = state->c;
 
        switch (state->stage) {
@@ -119,6 +116,22 @@ static void wreplsrv_out_connect_handler(struct wrepl_request *req)
        }
 }
 
+static void wreplsrv_out_connect_handler_creq(struct composite_context *creq)
+{
+       struct wreplsrv_out_connect_state *state = talloc_get_type(creq->async.private_data,
+                                                  struct wreplsrv_out_connect_state);
+       wreplsrv_out_connect_handler(state);
+       return;
+}
+
+static void wreplsrv_out_connect_handler_req(struct wrepl_request *req)
+{
+       struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private,
+                                                  struct wreplsrv_out_connect_state);
+       wreplsrv_out_connect_handler(state);
+       return;
+}
+
 static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner,
                                                           enum winsrepl_partner_type type,
                                                           struct wreplsrv_out_connection *wreplconn)
@@ -127,7 +140,7 @@ static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partn
        struct wreplsrv_service *service = partner->service;
        struct wreplsrv_out_connect_state *state = NULL;
        struct wreplsrv_out_connection **wreplconnp = &wreplconn;
-       BOOL cached_connection = False;
+       bool cached_connection = false;
 
        c = talloc_zero(partner, struct composite_context);
        if (!c) goto failed;
@@ -142,11 +155,11 @@ static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partn
        c->private_data = state;
 
        if (type == WINSREPL_PARTNER_PUSH) {
-               cached_connection       = True;
+               cached_connection       = true;
                wreplconn               = partner->push.wreplconn;
                wreplconnp              = &partner->push.wreplconn;
        } else if (type == WINSREPL_PARTNER_PULL) {
-               cached_connection       = True;
+               cached_connection       = true;
                wreplconn               = partner->pull.wreplconn;
                wreplconnp              = &partner->pull.wreplconn;
        }
@@ -156,12 +169,12 @@ static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partn
                if (!wreplconn->sock->dead) {
                        state->stage    = WREPLSRV_OUT_CONNECT_STAGE_DONE;
                        state->wreplconn= wreplconn;
-                       composite_trigger_done(c);
+                       composite_done(c);
                        return c;
                } else if (!cached_connection) {
                        state->stage    = WREPLSRV_OUT_CONNECT_STAGE_DONE;
                        state->wreplconn= NULL;
-                       composite_trigger_done(c);
+                       composite_done(c);
                        return c;
                } else {
                        talloc_free(wreplconn);
@@ -174,18 +187,18 @@ static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partn
 
        wreplconn->service      = service;
        wreplconn->partner      = partner;
-       wreplconn->sock         = wrepl_socket_init(wreplconn, service->task->event_ctx);
+       wreplconn->sock         = wrepl_socket_init(wreplconn, service->task->event_ctx, lp_iconv_convenience(service->task->lp_ctx));
        if (!wreplconn->sock) goto failed;
 
        state->stage    = WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET;
        state->wreplconn= wreplconn;
-       state->req      = wrepl_connect_send(wreplconn->sock,
-                                            partner->our_address,
+       state->c_req    = wrepl_connect_send(wreplconn->sock,
+                                            partner->our_address?partner->our_address:wrepl_best_ip(service->task->lp_ctx, partner->address),
                                             partner->address);
-       if (!state->req) goto failed;
+       if (!state->c_req) goto failed;
 
-       state->req->async.fn            = wreplsrv_out_connect_handler;
-       state->req->async.private       = state;
+       state->c_req->async.fn                  = wreplsrv_out_connect_handler_creq;
+       state->c_req->async.private_data        = state;
 
        return c;
 failed:
@@ -216,6 +229,18 @@ static NTSTATUS wreplsrv_out_connect_recv(struct composite_context *c, TALLOC_CT
        
 }
 
+struct wreplsrv_pull_table_io {
+       struct {
+               struct wreplsrv_partner *partner;
+               uint32_t num_owners;
+               struct wrepl_wins_owner *owners;
+       } in;
+       struct {
+               uint32_t num_owners;
+               struct wrepl_wins_owner *owners;
+       } out;
+};
+
 enum wreplsrv_pull_table_stage {
        WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION,
        WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY,
@@ -306,7 +331,7 @@ static void wreplsrv_pull_table_handler_req(struct wrepl_request *req)
        return;
 }
 
-struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io)
+static struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io)
 {
        struct composite_context *c = NULL;
        struct wreplsrv_service *service = io->in.partner->service;
@@ -328,7 +353,7 @@ struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct w
                state->table_io.out.num_partners        = io->in.num_owners;
                state->table_io.out.partners            = io->in.owners;
                state->stage                            = WREPLSRV_PULL_TABLE_STAGE_DONE;
-               composite_trigger_done(c);
+               composite_done(c);
                return c;
        }
 
@@ -345,8 +370,8 @@ failed:
        return NULL;
 }
 
-NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
-                                 struct wreplsrv_pull_table_io *io)
+static NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
+                                        struct wreplsrv_pull_table_io *io)
 {
        NTSTATUS status;
 
@@ -356,14 +381,25 @@ NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_c
                struct wreplsrv_pull_table_state *state = talloc_get_type(c->private_data,
                                                          struct wreplsrv_pull_table_state);
                io->out.num_owners      = state->table_io.out.num_partners;
-               io->out.owners          = state->table_io.out.partners;
-               talloc_reference(mem_ctx, state->table_io.out.partners);
+               io->out.owners          = talloc_reference(mem_ctx, state->table_io.out.partners);
        }
 
        talloc_free(c);
        return status;  
 }
 
+struct wreplsrv_pull_names_io {
+       struct {
+               struct wreplsrv_partner *partner;
+               struct wreplsrv_out_connection *wreplconn;
+               struct wrepl_wins_owner owner;
+       } in;
+       struct {
+               uint32_t num_names;
+               struct wrepl_name *names;
+       } out;
+};
+
 enum wreplsrv_pull_names_stage {
        WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION,
        WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY,
@@ -455,7 +491,7 @@ static void wreplsrv_pull_names_handler_req(struct wrepl_request *req)
        return;
 }
 
-struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io)
+static struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io)
 {
        struct composite_context *c = NULL;
        struct wreplsrv_service *service = io->in.partner->service;
@@ -489,8 +525,8 @@ failed:
        return NULL;
 }
 
-NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
-                                 struct wreplsrv_pull_names_io *io)
+static NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
+                                        struct wreplsrv_pull_names_io *io)
 {
        NTSTATUS status;
 
@@ -500,8 +536,7 @@ NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_c
                struct wreplsrv_pull_names_state *state = talloc_get_type(c->private_data,
                                                          struct wreplsrv_pull_names_state);
                io->out.num_names       = state->pull_io.out.num_names;
-               io->out.names           = state->pull_io.out.names;
-               talloc_reference(mem_ctx, state->pull_io.out.names);
+               io->out.names           = talloc_reference(mem_ctx, state->pull_io.out.names);
        }
 
        talloc_free(c);
@@ -533,17 +568,19 @@ static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req);
 
 static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state *state)
 {
-       struct wreplsrv_owner *current_owner;
+       struct wreplsrv_owner *current_owner=NULL;
        struct wreplsrv_owner *local_owner;
        uint32_t i;
        uint64_t old_max_version = 0;
-       BOOL do_pull = False;
+       bool do_pull = false;
 
        for (i=state->current; i < state->table_io.out.num_owners; i++) {
-               current_owner = wreplsrv_find_owner(state->io->in.partner->pull.table,
+               current_owner = wreplsrv_find_owner(state->io->in.partner->service,
+                                                   state->io->in.partner->pull.table,
                                                    state->table_io.out.owners[i].address);
 
-               local_owner = wreplsrv_find_owner(state->io->in.partner->service->table,
+               local_owner = wreplsrv_find_owner(state->io->in.partner->service,
+                                                 state->io->in.partner->service->table,
                                                  state->table_io.out.owners[i].address);
                /*
                 * this means we are ourself the current owner,
@@ -556,7 +593,7 @@ static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycl
                 * so fetch them
                 */
                if (!local_owner) {
-                       do_pull         = True;
+                       do_pull         = true;
                        
                        break;
                }
@@ -566,7 +603,7 @@ static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycl
                 * fetch them
                 */
                if (current_owner->owner.max_version > local_owner->owner.max_version) {
-                       do_pull         = True;
+                       do_pull         = true;
                        old_max_version = local_owner->owner.max_version;
                        break;
                }
@@ -627,12 +664,6 @@ static NTSTATUS wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_
 
        /* update partner table */
        for (i=0; i < state->table_io.out.num_owners; i++) {
-               BOOL is_our_addr;
-
-               is_our_addr = wreplsrv_is_our_address(state->io->in.partner->service,
-                                                     state->table_io.out.owners[i].address);
-               if (is_our_addr) continue;
-
                status = wreplsrv_add_table(state->io->in.partner->service,
                                            state->io->in.partner, 
                                            &state->io->in.partner->pull.table,
@@ -651,7 +682,10 @@ static NTSTATUS wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_sta
 {
        NTSTATUS status;
 
-       status = wreplsrv_apply_records(state->io->in.partner, &state->names_io);
+       status = wreplsrv_apply_records(state->io->in.partner,
+                                       &state->names_io.in.owner,
+                                       state->names_io.out.num_names,
+                                       state->names_io.out.names);
        NT_STATUS_NOT_OK_RETURN(status);
 
        talloc_free(state->names_io.out.names);
@@ -786,7 +820,7 @@ NTSTATUS wreplsrv_pull_cycle_recv(struct composite_context *c)
 
 enum wreplsrv_push_notify_stage {
        WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT,
-       WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE,
+       WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM,
        WREPLSRV_PUSH_NOTIFY_STAGE_DONE
 };
 
@@ -795,7 +829,8 @@ struct wreplsrv_push_notify_state {
        struct composite_context *c;
        struct wreplsrv_push_notify_io *io;
        enum wrepl_replication_cmd command;
-       BOOL full_table;
+       bool full_table;
+       struct wrepl_send_ctrl ctrl;
        struct wrepl_request *req;
        struct wrepl_packet req_packet;
        struct wrepl_packet *rep_packet;
@@ -815,57 +850,78 @@ static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *s
        struct wreplsrv_in_connection *wrepl_in;
        NTSTATUS status;
        struct socket_context *sock;
-       struct data_blob_list_item *update_rep;
-       const char *our_ip;
-       DATA_BLOB update_blob;
+       struct packet_context *packet;
+       uint16_t fde_flags;
 
+       /* prepare the outgoing request */
        req->opcode     = WREPL_OPCODE_BITS;
        req->assoc_ctx  = state->wreplconn->assoc_ctx.peer_ctx;
        req->mess_type  = WREPL_REPLICATION;
 
        repl_out->command = state->command;
 
-       our_ip = socket_get_my_addr(state->wreplconn->sock->sock, state);
-       NT_STATUS_HAVE_NO_MEMORY(our_ip);
-
        status = wreplsrv_fill_wrepl_table(service, state, table_out,
-                                          our_ip, our_ip, state->full_table);
+                                          service->wins_db->local_owner, state->full_table);
        NT_STATUS_NOT_OK_RETURN(status);
 
-       state->req = wrepl_request_send(state->wreplconn->sock, req);
+       /* queue the request */
+       state->req = wrepl_request_send(state->wreplconn->sock, req, NULL);
        NT_STATUS_HAVE_NO_MEMORY(state->req);
 
+       /*
+        * now we need to convert the wrepl_socket (client connection)
+        * into a wreplsrv_in_connection (server connection), because
+        * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
+        * message is received by the peer.
+        */
+
+       /* steal the socket_context */
        sock = state->wreplconn->sock->sock;
-       talloc_steal(state, state->wreplconn->sock->sock);
        state->wreplconn->sock->sock = NULL;
+       talloc_steal(state, sock);
 
-       update_blob = state->req->buffer;
-       talloc_steal(state, state->req->buffer.data);
+       /* 
+        * steal the packet_context
+        * note the request DATA_BLOB we just send on the
+        * wrepl_socket (client connection) is still unter the 
+        * packet context and will be send to the wire
+        */
+       packet = state->wreplconn->sock->packet;
+       state->wreplconn->sock->packet = NULL;
+       talloc_steal(state, packet);
+
+       /*
+        * get the fde_flags of the old fde event,
+        * so that we can later set the same flags to the new one
+        */
+       fde_flags = event_get_fd_flags(state->wreplconn->sock->event.fde);
 
+       /*
+        * free the wrepl_socket (client connection)
+        */
        talloc_free(state->wreplconn->sock);
        state->wreplconn->sock = NULL;
 
+       /*
+        * now create a wreplsrv_in_connection,
+        * on which we act as server
+        *
+        * NOTE: sock and packet will be stolen by
+        *       wreplsrv_in_connection_merge()
+        */
        status = wreplsrv_in_connection_merge(state->io->in.partner,
-                                             sock, &wrepl_in);
+                                             sock, packet, &wrepl_in);
        NT_STATUS_NOT_OK_RETURN(status);
 
+       event_set_fd_flags(wrepl_in->conn->event.fde, fde_flags);
+
        wrepl_in->assoc_ctx.peer_ctx    = state->wreplconn->assoc_ctx.peer_ctx;
        wrepl_in->assoc_ctx.our_ctx     = 0;
 
-       update_rep = talloc(wrepl_in, struct data_blob_list_item);
-       NT_STATUS_HAVE_NO_MEMORY(update_rep);
-
-       update_rep->blob = update_blob;
-       talloc_steal(update_rep, update_blob.data);
-
+       /* now we can free the wreplsrv_out_connection */
        talloc_free(state->wreplconn);
        state->wreplconn = NULL;
 
-       if (!wrepl_in->send_queue) {
-               EVENT_FD_WRITEABLE(wrepl_in->conn->event.fde);
-       }
-       DLIST_ADD_END(wrepl_in->send_queue, update_rep, struct data_blob_list_item *);
-
        state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
 
        return NT_STATUS_OK;
@@ -878,7 +934,6 @@ static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *s
        struct wrepl_replication *repl_out = &state->req_packet.message.replication;
        struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
        NTSTATUS status;
-       const char *our_ip;
 
        req->opcode     = WREPL_OPCODE_BITS;
        req->assoc_ctx  = state->wreplconn->assoc_ctx.peer_ctx;
@@ -886,22 +941,20 @@ static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *s
 
        repl_out->command = state->command;
 
-       our_ip = socket_get_my_addr(state->wreplconn->sock->sock, state);
-       NT_STATUS_HAVE_NO_MEMORY(our_ip);
-
        status = wreplsrv_fill_wrepl_table(service, state, table_out,
-                                          our_ip, our_ip, state->full_table);
+                                          service->wins_db->local_owner, state->full_table);
        NT_STATUS_NOT_OK_RETURN(status);
 
-       state->req = wrepl_request_send(state->wreplconn->sock, req);
+       /* we won't get a reply to a inform message */
+       state->ctrl.send_only           = true;
+
+       state->req = wrepl_request_send(state->wreplconn->sock, req, &state->ctrl);
        NT_STATUS_HAVE_NO_MEMORY(state->req);
 
-       /* we won't get a reply to a inform message */
-       state->req->send_only           = True;
        state->req->async.fn            = wreplsrv_push_notify_handler_req;
        state->req->async.private       = state;
 
-       state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE;
+       state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM;
 
        return NT_STATUS_OK;
 }
@@ -915,16 +968,16 @@ static NTSTATUS wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_st
 
        switch (state->command) {
        case WREPL_REPL_UPDATE:
-               state->full_table = True;
+               state->full_table = true;
                return wreplsrv_push_notify_update(state);
        case WREPL_REPL_UPDATE2:
-               state->full_table = False;
+               state->full_table = false;
                return wreplsrv_push_notify_update(state);
        case WREPL_REPL_INFORM:
-               state->full_table = True;
+               state->full_table = true;
                return wreplsrv_push_notify_inform(state);
        case WREPL_REPL_INFORM2:
-               state->full_table = False;
+               state->full_table = false;
                return wreplsrv_push_notify_inform(state);
        default:
                return NT_STATUS_INTERNAL_ERROR;
@@ -933,9 +986,15 @@ static NTSTATUS wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_st
        return NT_STATUS_INTERNAL_ERROR;
 }
 
-static NTSTATUS wreplsrv_push_notify_wait_update(struct wreplsrv_push_notify_state *state)
+static NTSTATUS wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state *state)
 {
-       return NT_STATUS_FOOBAR;
+       NTSTATUS status;
+
+       status =  wrepl_request_recv(state->req, state, NULL);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
+       return status;
 }
 
 static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state *state)
@@ -946,8 +1005,8 @@ static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state *stat
        case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT:
                c->status = wreplsrv_push_notify_wait_connect(state);
                break;
-       case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE:
-               c->status = wreplsrv_push_notify_wait_update(state);
+       case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM:
+               c->status = wreplsrv_push_notify_wait_inform(state);
                break;
        case WREPLSRV_PUSH_NOTIFY_STAGE_DONE:
                c->status = NT_STATUS_INTERNAL_ERROR;