This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
+ the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "includes.h"
-#include "dlinklist.h"
#include "lib/events/events.h"
#include "lib/socket/socket.h"
#include "smbd/service_task.h"
#include "smbd/service_stream.h"
-#include "lib/messaging/irpc.h"
-#include "librpc/gen_ndr/ndr_winsrepl.h"
+#include "librpc/gen_ndr/winsrepl.h"
#include "wrepl_server/wrepl_server.h"
-#include "wrepl_server/wrepl_out_helpers.h"
#include "nbt_server/wins/winsdb.h"
-#include "ldb/include/ldb.h"
#include "libcli/composite/composite.h"
#include "libcli/wrepl/winsrepl.h"
+#include "libcli/resolve/resolve.h"
+#include "param/param.h"
enum wreplsrv_out_connect_stage {
WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET,
enum wreplsrv_out_connect_stage stage;
struct composite_context *c;
struct wrepl_request *req;
+ struct composite_context *c_req;
struct wrepl_associate assoc_io;
enum winsrepl_partner_type type;
struct wreplsrv_out_connection *wreplconn;
};
-static void wreplsrv_out_connect_handler(struct wrepl_request *req);
+static void wreplsrv_out_connect_handler_creq(struct composite_context *c_req);
+static void wreplsrv_out_connect_handler_req(struct wrepl_request *req);
static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state)
{
NTSTATUS status;
- status = wrepl_connect_recv(state->req);
+ status = wrepl_connect_recv(state->c_req);
NT_STATUS_NOT_OK_RETURN(status);
state->req = wrepl_associate_send(state->wreplconn->sock, &state->assoc_io);
NT_STATUS_HAVE_NO_MEMORY(state->req);
- state->req->async.fn = wreplsrv_out_connect_handler;
+ state->req->async.fn = wreplsrv_out_connect_handler_req;
state->req->async.private = state;
state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX;
return NT_STATUS_OK;
}
-static void wreplsrv_out_connect_handler(struct wrepl_request *req)
+static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state *state)
{
- struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private,
- struct wreplsrv_out_connect_state);
struct composite_context *c = state->c;
switch (state->stage) {
}
}
+static void wreplsrv_out_connect_handler_creq(struct composite_context *creq)
+{
+ struct wreplsrv_out_connect_state *state = talloc_get_type(creq->async.private_data,
+ struct wreplsrv_out_connect_state);
+ wreplsrv_out_connect_handler(state);
+ return;
+}
+
+static void wreplsrv_out_connect_handler_req(struct wrepl_request *req)
+{
+ struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private,
+ struct wreplsrv_out_connect_state);
+ wreplsrv_out_connect_handler(state);
+ return;
+}
+
static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner,
enum winsrepl_partner_type type,
struct wreplsrv_out_connection *wreplconn)
struct wreplsrv_service *service = partner->service;
struct wreplsrv_out_connect_state *state = NULL;
struct wreplsrv_out_connection **wreplconnp = &wreplconn;
- BOOL cached_connection = False;
+ bool cached_connection = false;
c = talloc_zero(partner, struct composite_context);
if (!c) goto failed;
c->private_data = state;
if (type == WINSREPL_PARTNER_PUSH) {
- cached_connection = True;
+ cached_connection = true;
wreplconn = partner->push.wreplconn;
wreplconnp = &partner->push.wreplconn;
} else if (type == WINSREPL_PARTNER_PULL) {
- cached_connection = True;
+ cached_connection = true;
wreplconn = partner->pull.wreplconn;
wreplconnp = &partner->pull.wreplconn;
}
if (!wreplconn->sock->dead) {
state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
state->wreplconn= wreplconn;
- composite_trigger_done(c);
+ composite_done(c);
return c;
} else if (!cached_connection) {
state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
state->wreplconn= NULL;
- composite_trigger_done(c);
+ composite_done(c);
return c;
} else {
talloc_free(wreplconn);
wreplconn->service = service;
wreplconn->partner = partner;
- wreplconn->sock = wrepl_socket_init(wreplconn, service->task->event_ctx);
+ wreplconn->sock = wrepl_socket_init(wreplconn, service->task->event_ctx, lp_iconv_convenience(service->task->lp_ctx));
if (!wreplconn->sock) goto failed;
state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET;
state->wreplconn= wreplconn;
- state->req = wrepl_connect_send(wreplconn->sock,
- partner->our_address,
+ state->c_req = wrepl_connect_send(wreplconn->sock,
+ partner->our_address?partner->our_address:wrepl_best_ip(service->task->lp_ctx, partner->address),
partner->address);
- if (!state->req) goto failed;
+ if (!state->c_req) goto failed;
- state->req->async.fn = wreplsrv_out_connect_handler;
- state->req->async.private = state;
+ state->c_req->async.fn = wreplsrv_out_connect_handler_creq;
+ state->c_req->async.private_data = state;
return c;
failed:
}
+struct wreplsrv_pull_table_io {
+ struct {
+ struct wreplsrv_partner *partner;
+ uint32_t num_owners;
+ struct wrepl_wins_owner *owners;
+ } in;
+ struct {
+ uint32_t num_owners;
+ struct wrepl_wins_owner *owners;
+ } out;
+};
+
enum wreplsrv_pull_table_stage {
WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION,
WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY,
return;
}
-struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io)
+static struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io)
{
struct composite_context *c = NULL;
struct wreplsrv_service *service = io->in.partner->service;
state->table_io.out.num_partners = io->in.num_owners;
state->table_io.out.partners = io->in.owners;
state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
- composite_trigger_done(c);
+ composite_done(c);
return c;
}
return NULL;
}
-NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
- struct wreplsrv_pull_table_io *io)
+static NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
+ struct wreplsrv_pull_table_io *io)
{
NTSTATUS status;
struct wreplsrv_pull_table_state *state = talloc_get_type(c->private_data,
struct wreplsrv_pull_table_state);
io->out.num_owners = state->table_io.out.num_partners;
- io->out.owners = state->table_io.out.partners;
- talloc_reference(mem_ctx, state->table_io.out.partners);
+ io->out.owners = talloc_reference(mem_ctx, state->table_io.out.partners);
}
talloc_free(c);
return status;
}
+struct wreplsrv_pull_names_io {
+ struct {
+ struct wreplsrv_partner *partner;
+ struct wreplsrv_out_connection *wreplconn;
+ struct wrepl_wins_owner owner;
+ } in;
+ struct {
+ uint32_t num_names;
+ struct wrepl_name *names;
+ } out;
+};
+
enum wreplsrv_pull_names_stage {
WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION,
WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY,
return;
}
-struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io)
+static struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io)
{
struct composite_context *c = NULL;
struct wreplsrv_service *service = io->in.partner->service;
return NULL;
}
-NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
- struct wreplsrv_pull_names_io *io)
+static NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
+ struct wreplsrv_pull_names_io *io)
{
NTSTATUS status;
struct wreplsrv_pull_names_state *state = talloc_get_type(c->private_data,
struct wreplsrv_pull_names_state);
io->out.num_names = state->pull_io.out.num_names;
- io->out.names = state->pull_io.out.names;
- talloc_reference(mem_ctx, state->pull_io.out.names);
+ io->out.names = talloc_reference(mem_ctx, state->pull_io.out.names);
}
talloc_free(c);
static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state *state)
{
- struct wreplsrv_owner *current_owner;
+ struct wreplsrv_owner *current_owner=NULL;
struct wreplsrv_owner *local_owner;
uint32_t i;
uint64_t old_max_version = 0;
- BOOL do_pull = False;
+ bool do_pull = false;
for (i=state->current; i < state->table_io.out.num_owners; i++) {
- current_owner = wreplsrv_find_owner(state->io->in.partner->pull.table,
+ current_owner = wreplsrv_find_owner(state->io->in.partner->service,
+ state->io->in.partner->pull.table,
state->table_io.out.owners[i].address);
- local_owner = wreplsrv_find_owner(state->io->in.partner->service->table,
+ local_owner = wreplsrv_find_owner(state->io->in.partner->service,
+ state->io->in.partner->service->table,
state->table_io.out.owners[i].address);
/*
* this means we are ourself the current owner,
* so fetch them
*/
if (!local_owner) {
- do_pull = True;
+ do_pull = true;
break;
}
* fetch them
*/
if (current_owner->owner.max_version > local_owner->owner.max_version) {
- do_pull = True;
+ do_pull = true;
old_max_version = local_owner->owner.max_version;
break;
}
/* update partner table */
for (i=0; i < state->table_io.out.num_owners; i++) {
- BOOL is_our_addr;
-
- is_our_addr = wreplsrv_is_our_address(state->io->in.partner->service,
- state->table_io.out.owners[i].address);
- if (is_our_addr) continue;
-
status = wreplsrv_add_table(state->io->in.partner->service,
state->io->in.partner,
&state->io->in.partner->pull.table,
{
NTSTATUS status;
- status = wreplsrv_apply_records(state->io->in.partner, &state->names_io);
+ status = wreplsrv_apply_records(state->io->in.partner,
+ &state->names_io.in.owner,
+ state->names_io.out.num_names,
+ state->names_io.out.names);
NT_STATUS_NOT_OK_RETURN(status);
talloc_free(state->names_io.out.names);
enum wreplsrv_push_notify_stage {
WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT,
- WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE,
+ WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM,
WREPLSRV_PUSH_NOTIFY_STAGE_DONE
};
struct composite_context *c;
struct wreplsrv_push_notify_io *io;
enum wrepl_replication_cmd command;
- BOOL full_table;
+ bool full_table;
+ struct wrepl_send_ctrl ctrl;
struct wrepl_request *req;
struct wrepl_packet req_packet;
struct wrepl_packet *rep_packet;
struct wreplsrv_in_connection *wrepl_in;
NTSTATUS status;
struct socket_context *sock;
- struct data_blob_list_item *update_rep;
- const char *our_ip;
- DATA_BLOB update_blob;
+ struct packet_context *packet;
+ uint16_t fde_flags;
+ /* prepare the outgoing request */
req->opcode = WREPL_OPCODE_BITS;
req->assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
req->mess_type = WREPL_REPLICATION;
repl_out->command = state->command;
- our_ip = socket_get_my_addr(state->wreplconn->sock->sock, state);
- NT_STATUS_HAVE_NO_MEMORY(our_ip);
-
status = wreplsrv_fill_wrepl_table(service, state, table_out,
- our_ip, our_ip, state->full_table);
+ service->wins_db->local_owner, state->full_table);
NT_STATUS_NOT_OK_RETURN(status);
- state->req = wrepl_request_send(state->wreplconn->sock, req);
+ /* queue the request */
+ state->req = wrepl_request_send(state->wreplconn->sock, req, NULL);
NT_STATUS_HAVE_NO_MEMORY(state->req);
+ /*
+ * now we need to convert the wrepl_socket (client connection)
+ * into a wreplsrv_in_connection (server connection), because
+ * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
+ * message is received by the peer.
+ */
+
+ /* steal the socket_context */
sock = state->wreplconn->sock->sock;
- talloc_steal(state, state->wreplconn->sock->sock);
state->wreplconn->sock->sock = NULL;
+ talloc_steal(state, sock);
- update_blob = state->req->buffer;
- talloc_steal(state, state->req->buffer.data);
+ /*
+ * steal the packet_context
+ * note the request DATA_BLOB we just send on the
+ * wrepl_socket (client connection) is still unter the
+ * packet context and will be send to the wire
+ */
+ packet = state->wreplconn->sock->packet;
+ state->wreplconn->sock->packet = NULL;
+ talloc_steal(state, packet);
+
+ /*
+ * get the fde_flags of the old fde event,
+ * so that we can later set the same flags to the new one
+ */
+ fde_flags = event_get_fd_flags(state->wreplconn->sock->event.fde);
+ /*
+ * free the wrepl_socket (client connection)
+ */
talloc_free(state->wreplconn->sock);
state->wreplconn->sock = NULL;
+ /*
+ * now create a wreplsrv_in_connection,
+ * on which we act as server
+ *
+ * NOTE: sock and packet will be stolen by
+ * wreplsrv_in_connection_merge()
+ */
status = wreplsrv_in_connection_merge(state->io->in.partner,
- sock, &wrepl_in);
+ sock, packet, &wrepl_in);
NT_STATUS_NOT_OK_RETURN(status);
+ event_set_fd_flags(wrepl_in->conn->event.fde, fde_flags);
+
wrepl_in->assoc_ctx.peer_ctx = state->wreplconn->assoc_ctx.peer_ctx;
wrepl_in->assoc_ctx.our_ctx = 0;
- update_rep = talloc(wrepl_in, struct data_blob_list_item);
- NT_STATUS_HAVE_NO_MEMORY(update_rep);
-
- update_rep->blob = update_blob;
- talloc_steal(update_rep, update_blob.data);
-
+ /* now we can free the wreplsrv_out_connection */
talloc_free(state->wreplconn);
state->wreplconn = NULL;
- if (!wrepl_in->send_queue) {
- EVENT_FD_WRITEABLE(wrepl_in->conn->event.fde);
- }
- DLIST_ADD_END(wrepl_in->send_queue, update_rep, struct data_blob_list_item *);
-
state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
return NT_STATUS_OK;
struct wrepl_replication *repl_out = &state->req_packet.message.replication;
struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
NTSTATUS status;
- const char *our_ip;
req->opcode = WREPL_OPCODE_BITS;
req->assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
repl_out->command = state->command;
- our_ip = socket_get_my_addr(state->wreplconn->sock->sock, state);
- NT_STATUS_HAVE_NO_MEMORY(our_ip);
-
status = wreplsrv_fill_wrepl_table(service, state, table_out,
- our_ip, our_ip, state->full_table);
+ service->wins_db->local_owner, state->full_table);
NT_STATUS_NOT_OK_RETURN(status);
- state->req = wrepl_request_send(state->wreplconn->sock, req);
+ /* we won't get a reply to a inform message */
+ state->ctrl.send_only = true;
+
+ state->req = wrepl_request_send(state->wreplconn->sock, req, &state->ctrl);
NT_STATUS_HAVE_NO_MEMORY(state->req);
- /* we won't get a reply to a inform message */
- state->req->send_only = True;
state->req->async.fn = wreplsrv_push_notify_handler_req;
state->req->async.private = state;
- state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE;
+ state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM;
return NT_STATUS_OK;
}
switch (state->command) {
case WREPL_REPL_UPDATE:
- state->full_table = True;
+ state->full_table = true;
return wreplsrv_push_notify_update(state);
case WREPL_REPL_UPDATE2:
- state->full_table = False;
+ state->full_table = false;
return wreplsrv_push_notify_update(state);
case WREPL_REPL_INFORM:
- state->full_table = True;
+ state->full_table = true;
return wreplsrv_push_notify_inform(state);
case WREPL_REPL_INFORM2:
- state->full_table = False;
+ state->full_table = false;
return wreplsrv_push_notify_inform(state);
default:
return NT_STATUS_INTERNAL_ERROR;
return NT_STATUS_INTERNAL_ERROR;
}
-static NTSTATUS wreplsrv_push_notify_wait_update(struct wreplsrv_push_notify_state *state)
+static NTSTATUS wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state *state)
{
- return NT_STATUS_FOOBAR;
+ NTSTATUS status;
+
+ status = wrepl_request_recv(state->req, state, NULL);
+ NT_STATUS_NOT_OK_RETURN(status);
+
+ state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
+ return status;
}
static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state *state)
case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT:
c->status = wreplsrv_push_notify_wait_connect(state);
break;
- case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE:
- c->status = wreplsrv_push_notify_wait_update(state);
+ case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM:
+ c->status = wreplsrv_push_notify_wait_inform(state);
break;
case WREPLSRV_PUSH_NOTIFY_STAGE_DONE:
c->status = NT_STATUS_INTERNAL_ERROR;