2 Unix SMB/CIFS implementation.
4 WINS Replication server
6 Copyright (C) Stefan Metzmacher 2005
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 2 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 #include "lib/events/events.h"
25 #include "lib/socket/socket.h"
26 #include "smbd/service_task.h"
27 #include "smbd/service_stream.h"
28 #include "librpc/gen_ndr/ndr_winsrepl.h"
29 #include "wrepl_server/wrepl_server.h"
30 #include "wrepl_server/wrepl_out_helpers.h"
31 #include "libcli/composite/composite.h"
32 #include "libcli/wrepl/winsrepl.h"
34 enum wreplsrv_out_connect_stage {
35 WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET,
36 WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX,
37 WREPLSRV_OUT_CONNECT_STAGE_DONE
40 struct wreplsrv_out_connect_state {
41 enum wreplsrv_out_connect_stage stage;
42 struct composite_context *c;
43 struct wrepl_request *req;
44 struct composite_context *c_req;
45 struct wrepl_associate assoc_io;
46 enum winsrepl_partner_type type;
47 struct wreplsrv_out_connection *wreplconn;
50 static void wreplsrv_out_connect_handler_creq(struct composite_context *c_req);
51 static void wreplsrv_out_connect_handler_req(struct wrepl_request *req);
53 static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state)
57 status = wrepl_connect_recv(state->c_req);
58 NT_STATUS_NOT_OK_RETURN(status);
60 state->req = wrepl_associate_send(state->wreplconn->sock, &state->assoc_io);
61 NT_STATUS_HAVE_NO_MEMORY(state->req);
63 state->req->async.fn = wreplsrv_out_connect_handler_req;
64 state->req->async.private = state;
66 state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX;
71 static NTSTATUS wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state *state)
75 status = wrepl_associate_recv(state->req, &state->assoc_io);
76 NT_STATUS_NOT_OK_RETURN(status);
78 state->wreplconn->assoc_ctx.peer_ctx = state->assoc_io.out.assoc_ctx;
80 if (state->type == WINSREPL_PARTNER_PUSH) {
81 state->wreplconn->partner->push.wreplconn = state->wreplconn;
82 talloc_steal(state->wreplconn->partner, state->wreplconn);
83 } else if (state->type == WINSREPL_PARTNER_PULL) {
84 state->wreplconn->partner->pull.wreplconn = state->wreplconn;
85 talloc_steal(state->wreplconn->partner, state->wreplconn);
88 state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
93 static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state *state)
95 struct composite_context *c = state->c;
97 switch (state->stage) {
98 case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET:
99 c->status = wreplsrv_out_connect_wait_socket(state);
101 case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX:
102 c->status = wreplsrv_out_connect_wait_assoc_ctx(state);
103 c->state = COMPOSITE_STATE_DONE;
105 case WREPLSRV_OUT_CONNECT_STAGE_DONE:
106 c->status = NT_STATUS_INTERNAL_ERROR;
109 if (!NT_STATUS_IS_OK(c->status)) {
110 c->state = COMPOSITE_STATE_ERROR;
113 if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
118 static void wreplsrv_out_connect_handler_creq(struct composite_context *creq)
120 struct wreplsrv_out_connect_state *state = talloc_get_type(creq->async.private_data,
121 struct wreplsrv_out_connect_state);
122 wreplsrv_out_connect_handler(state);
126 static void wreplsrv_out_connect_handler_req(struct wrepl_request *req)
128 struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private,
129 struct wreplsrv_out_connect_state);
130 wreplsrv_out_connect_handler(state);
134 static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner,
135 enum winsrepl_partner_type type,
136 struct wreplsrv_out_connection *wreplconn)
138 struct composite_context *c = NULL;
139 struct wreplsrv_service *service = partner->service;
140 struct wreplsrv_out_connect_state *state = NULL;
141 struct wreplsrv_out_connection **wreplconnp = &wreplconn;
142 BOOL cached_connection = False;
144 c = talloc_zero(partner, struct composite_context);
147 state = talloc_zero(c, struct wreplsrv_out_connect_state);
148 if (!state) goto failed;
152 c->state = COMPOSITE_STATE_IN_PROGRESS;
153 c->event_ctx = service->task->event_ctx;
154 c->private_data = state;
156 if (type == WINSREPL_PARTNER_PUSH) {
157 cached_connection = True;
158 wreplconn = partner->push.wreplconn;
159 wreplconnp = &partner->push.wreplconn;
160 } else if (type == WINSREPL_PARTNER_PULL) {
161 cached_connection = True;
162 wreplconn = partner->pull.wreplconn;
163 wreplconnp = &partner->pull.wreplconn;
166 /* we have a connection already, so use it */
168 if (!wreplconn->sock->dead) {
169 state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
170 state->wreplconn= wreplconn;
173 } else if (!cached_connection) {
174 state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
175 state->wreplconn= NULL;
179 talloc_free(wreplconn);
184 wreplconn = talloc_zero(state, struct wreplsrv_out_connection);
185 if (!wreplconn) goto failed;
187 wreplconn->service = service;
188 wreplconn->partner = partner;
189 wreplconn->sock = wrepl_socket_init(wreplconn, service->task->event_ctx);
190 if (!wreplconn->sock) goto failed;
192 state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET;
193 state->wreplconn= wreplconn;
194 state->c_req = wrepl_connect_send(wreplconn->sock,
195 partner->our_address,
197 if (!state->c_req) goto failed;
199 state->c_req->async.fn = wreplsrv_out_connect_handler_creq;
200 state->c_req->async.private_data = state;
208 static NTSTATUS wreplsrv_out_connect_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
209 struct wreplsrv_out_connection **wreplconn)
213 status = composite_wait(c);
215 if (NT_STATUS_IS_OK(status)) {
216 struct wreplsrv_out_connect_state *state = talloc_get_type(c->private_data,
217 struct wreplsrv_out_connect_state);
218 if (state->wreplconn) {
219 *wreplconn = talloc_reference(mem_ctx, state->wreplconn);
220 if (!*wreplconn) status = NT_STATUS_NO_MEMORY;
222 status = NT_STATUS_INVALID_CONNECTION;
231 enum wreplsrv_pull_table_stage {
232 WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION,
233 WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY,
234 WREPLSRV_PULL_TABLE_STAGE_DONE
237 struct wreplsrv_pull_table_state {
238 enum wreplsrv_pull_table_stage stage;
239 struct composite_context *c;
240 struct wrepl_request *req;
241 struct wrepl_pull_table table_io;
242 struct wreplsrv_pull_table_io *io;
243 struct composite_context *creq;
244 struct wreplsrv_out_connection *wreplconn;
247 static void wreplsrv_pull_table_handler_req(struct wrepl_request *req);
249 static NTSTATUS wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state *state)
253 status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
254 NT_STATUS_NOT_OK_RETURN(status);
256 state->table_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
257 state->req = wrepl_pull_table_send(state->wreplconn->sock, &state->table_io);
258 NT_STATUS_HAVE_NO_MEMORY(state->req);
260 state->req->async.fn = wreplsrv_pull_table_handler_req;
261 state->req->async.private = state;
263 state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY;
268 static NTSTATUS wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state *state)
272 status = wrepl_pull_table_recv(state->req, state, &state->table_io);
273 NT_STATUS_NOT_OK_RETURN(status);
275 state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
280 static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state *state)
282 struct composite_context *c = state->c;
284 switch (state->stage) {
285 case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION:
286 c->status = wreplsrv_pull_table_wait_connection(state);
288 case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY:
289 c->status = wreplsrv_pull_table_wait_table_reply(state);
290 c->state = COMPOSITE_STATE_DONE;
292 case WREPLSRV_PULL_TABLE_STAGE_DONE:
293 c->status = NT_STATUS_INTERNAL_ERROR;
296 if (!NT_STATUS_IS_OK(c->status)) {
297 c->state = COMPOSITE_STATE_ERROR;
300 if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
305 static void wreplsrv_pull_table_handler_creq(struct composite_context *creq)
307 struct wreplsrv_pull_table_state *state = talloc_get_type(creq->async.private_data,
308 struct wreplsrv_pull_table_state);
309 wreplsrv_pull_table_handler(state);
313 static void wreplsrv_pull_table_handler_req(struct wrepl_request *req)
315 struct wreplsrv_pull_table_state *state = talloc_get_type(req->async.private,
316 struct wreplsrv_pull_table_state);
317 wreplsrv_pull_table_handler(state);
321 struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io)
323 struct composite_context *c = NULL;
324 struct wreplsrv_service *service = io->in.partner->service;
325 struct wreplsrv_pull_table_state *state = NULL;
327 c = talloc_zero(mem_ctx, struct composite_context);
330 state = talloc_zero(c, struct wreplsrv_pull_table_state);
331 if (!state) goto failed;
335 c->state = COMPOSITE_STATE_IN_PROGRESS;
336 c->event_ctx = service->task->event_ctx;
337 c->private_data = state;
339 if (io->in.num_owners) {
340 state->table_io.out.num_partners = io->in.num_owners;
341 state->table_io.out.partners = io->in.owners;
342 state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
347 state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION;
348 state->creq = wreplsrv_out_connect_send(io->in.partner, WINSREPL_PARTNER_PULL, NULL);
349 if (!state->creq) goto failed;
351 state->creq->async.fn = wreplsrv_pull_table_handler_creq;
352 state->creq->async.private_data = state;
360 NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
361 struct wreplsrv_pull_table_io *io)
365 status = composite_wait(c);
367 if (NT_STATUS_IS_OK(status)) {
368 struct wreplsrv_pull_table_state *state = talloc_get_type(c->private_data,
369 struct wreplsrv_pull_table_state);
370 io->out.num_owners = state->table_io.out.num_partners;
371 io->out.owners = state->table_io.out.partners;
372 talloc_reference(mem_ctx, state->table_io.out.partners);
379 enum wreplsrv_pull_names_stage {
380 WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION,
381 WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY,
382 WREPLSRV_PULL_NAMES_STAGE_DONE
385 struct wreplsrv_pull_names_state {
386 enum wreplsrv_pull_names_stage stage;
387 struct composite_context *c;
388 struct wrepl_request *req;
389 struct wrepl_pull_names pull_io;
390 struct wreplsrv_pull_names_io *io;
391 struct composite_context *creq;
392 struct wreplsrv_out_connection *wreplconn;
395 static void wreplsrv_pull_names_handler_req(struct wrepl_request *req);
397 static NTSTATUS wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state *state)
401 status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
402 NT_STATUS_NOT_OK_RETURN(status);
404 state->pull_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
405 state->pull_io.in.partner = state->io->in.owner;
406 state->req = wrepl_pull_names_send(state->wreplconn->sock, &state->pull_io);
407 NT_STATUS_HAVE_NO_MEMORY(state->req);
409 state->req->async.fn = wreplsrv_pull_names_handler_req;
410 state->req->async.private = state;
412 state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY;
417 static NTSTATUS wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state *state)
421 status = wrepl_pull_names_recv(state->req, state, &state->pull_io);
422 NT_STATUS_NOT_OK_RETURN(status);
424 state->stage = WREPLSRV_PULL_NAMES_STAGE_DONE;
429 static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state *state)
431 struct composite_context *c = state->c;
433 switch (state->stage) {
434 case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION:
435 c->status = wreplsrv_pull_names_wait_connection(state);
437 case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY:
438 c->status = wreplsrv_pull_names_wait_send_reply(state);
439 c->state = COMPOSITE_STATE_DONE;
441 case WREPLSRV_PULL_NAMES_STAGE_DONE:
442 c->status = NT_STATUS_INTERNAL_ERROR;
445 if (!NT_STATUS_IS_OK(c->status)) {
446 c->state = COMPOSITE_STATE_ERROR;
449 if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
454 static void wreplsrv_pull_names_handler_creq(struct composite_context *creq)
456 struct wreplsrv_pull_names_state *state = talloc_get_type(creq->async.private_data,
457 struct wreplsrv_pull_names_state);
458 wreplsrv_pull_names_handler(state);
462 static void wreplsrv_pull_names_handler_req(struct wrepl_request *req)
464 struct wreplsrv_pull_names_state *state = talloc_get_type(req->async.private,
465 struct wreplsrv_pull_names_state);
466 wreplsrv_pull_names_handler(state);
470 struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io)
472 struct composite_context *c = NULL;
473 struct wreplsrv_service *service = io->in.partner->service;
474 struct wreplsrv_pull_names_state *state = NULL;
475 enum winsrepl_partner_type partner_type = WINSREPL_PARTNER_PULL;
477 if (io->in.wreplconn) partner_type = WINSREPL_PARTNER_NONE;
479 c = talloc_zero(mem_ctx, struct composite_context);
482 state = talloc_zero(c, struct wreplsrv_pull_names_state);
483 if (!state) goto failed;
487 c->state = COMPOSITE_STATE_IN_PROGRESS;
488 c->event_ctx = service->task->event_ctx;
489 c->private_data = state;
491 state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION;
492 state->creq = wreplsrv_out_connect_send(io->in.partner, partner_type, io->in.wreplconn);
493 if (!state->creq) goto failed;
495 state->creq->async.fn = wreplsrv_pull_names_handler_creq;
496 state->creq->async.private_data = state;
504 NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
505 struct wreplsrv_pull_names_io *io)
509 status = composite_wait(c);
511 if (NT_STATUS_IS_OK(status)) {
512 struct wreplsrv_pull_names_state *state = talloc_get_type(c->private_data,
513 struct wreplsrv_pull_names_state);
514 io->out.num_names = state->pull_io.out.num_names;
515 io->out.names = state->pull_io.out.names;
516 talloc_reference(mem_ctx, state->pull_io.out.names);
524 enum wreplsrv_pull_cycle_stage {
525 WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY,
526 WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES,
527 WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC,
528 WREPLSRV_PULL_CYCLE_STAGE_DONE
531 struct wreplsrv_pull_cycle_state {
532 enum wreplsrv_pull_cycle_stage stage;
533 struct composite_context *c;
534 struct wreplsrv_pull_cycle_io *io;
535 struct wreplsrv_pull_table_io table_io;
537 struct wreplsrv_pull_names_io names_io;
538 struct composite_context *creq;
539 struct wrepl_associate_stop assoc_stop_io;
540 struct wrepl_request *req;
543 static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq);
544 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req);
546 static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state *state)
548 struct wreplsrv_owner *current_owner;
549 struct wreplsrv_owner *local_owner;
551 uint64_t old_max_version = 0;
552 BOOL do_pull = False;
554 for (i=state->current; i < state->table_io.out.num_owners; i++) {
555 current_owner = wreplsrv_find_owner(state->io->in.partner->service,
556 state->io->in.partner->pull.table,
557 state->table_io.out.owners[i].address);
559 local_owner = wreplsrv_find_owner(state->io->in.partner->service,
560 state->io->in.partner->service->table,
561 state->table_io.out.owners[i].address);
563 * this means we are ourself the current owner,
564 * and we don't want replicate ourself
566 if (!current_owner) continue;
569 * this means we don't have any records of this owner
579 * this means the remote partner has some new records of this owner
582 if (current_owner->owner.max_version > local_owner->owner.max_version) {
584 old_max_version = local_owner->owner.max_version;
591 state->names_io.in.partner = state->io->in.partner;
592 state->names_io.in.wreplconn = state->io->in.wreplconn;
593 state->names_io.in.owner = current_owner->owner;
594 state->names_io.in.owner.min_version = old_max_version + 1;
595 state->creq = wreplsrv_pull_names_send(state, &state->names_io);
596 NT_STATUS_HAVE_NO_MEMORY(state->creq);
598 state->creq->async.fn = wreplsrv_pull_cycle_handler_creq;
599 state->creq->async.private_data = state;
601 return STATUS_MORE_ENTRIES;
607 static NTSTATUS wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state *state)
611 status = wreplsrv_pull_cycle_next_owner_do_work(state);
612 if (NT_STATUS_IS_OK(status)) {
613 state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
614 } else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES, status)) {
615 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES;
616 status = NT_STATUS_OK;
619 if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE && state->io->in.wreplconn) {
620 state->assoc_stop_io.in.assoc_ctx = state->io->in.wreplconn->assoc_ctx.peer_ctx;
621 state->assoc_stop_io.in.reason = 0;
622 state->req = wrepl_associate_stop_send(state->io->in.wreplconn->sock, &state->assoc_stop_io);
623 NT_STATUS_HAVE_NO_MEMORY(state->req);
625 state->req->async.fn = wreplsrv_pull_cycle_handler_req;
626 state->req->async.private = state;
628 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC;
634 static NTSTATUS wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state *state)
639 status = wreplsrv_pull_table_recv(state->creq, state, &state->table_io);
640 NT_STATUS_NOT_OK_RETURN(status);
642 /* update partner table */
643 for (i=0; i < state->table_io.out.num_owners; i++) {
644 status = wreplsrv_add_table(state->io->in.partner->service,
645 state->io->in.partner,
646 &state->io->in.partner->pull.table,
647 state->table_io.out.owners[i].address,
648 state->table_io.out.owners[i].max_version);
649 NT_STATUS_NOT_OK_RETURN(status);
652 status = wreplsrv_pull_cycle_next_owner_wrapper(state);
653 NT_STATUS_NOT_OK_RETURN(status);
658 static NTSTATUS wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state *state)
662 status = wreplsrv_apply_records(state->io->in.partner, &state->names_io);
663 NT_STATUS_NOT_OK_RETURN(status);
665 talloc_free(state->names_io.out.names);
666 ZERO_STRUCT(state->names_io);
671 static NTSTATUS wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state *state)
675 status = wreplsrv_pull_names_recv(state->creq, state, &state->names_io);
676 NT_STATUS_NOT_OK_RETURN(status);
679 * TODO: this should maybe an async call,
680 * because we may need some network access
681 * for conflict resolving
683 status = wreplsrv_pull_cycle_apply_records(state);
684 NT_STATUS_NOT_OK_RETURN(status);
686 status = wreplsrv_pull_cycle_next_owner_wrapper(state);
687 NT_STATUS_NOT_OK_RETURN(status);
692 static NTSTATUS wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state *state)
696 status = wrepl_associate_stop_recv(state->req, &state->assoc_stop_io);
697 NT_STATUS_NOT_OK_RETURN(status);
699 state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
704 static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state *state)
706 struct composite_context *c = state->c;
708 switch (state->stage) {
709 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY:
710 c->status = wreplsrv_pull_cycle_wait_table_reply(state);
712 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES:
713 c->status = wreplsrv_pull_cycle_wait_send_replies(state);
715 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC:
716 c->status = wreplsrv_pull_cycle_wait_stop_assoc(state);
718 case WREPLSRV_PULL_CYCLE_STAGE_DONE:
719 c->status = NT_STATUS_INTERNAL_ERROR;
722 if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE) {
723 c->state = COMPOSITE_STATE_DONE;
726 if (!NT_STATUS_IS_OK(c->status)) {
727 c->state = COMPOSITE_STATE_ERROR;
730 if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
735 static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq)
737 struct wreplsrv_pull_cycle_state *state = talloc_get_type(creq->async.private_data,
738 struct wreplsrv_pull_cycle_state);
739 wreplsrv_pull_cycle_handler(state);
743 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req)
745 struct wreplsrv_pull_cycle_state *state = talloc_get_type(req->async.private,
746 struct wreplsrv_pull_cycle_state);
747 wreplsrv_pull_cycle_handler(state);
751 struct composite_context *wreplsrv_pull_cycle_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_cycle_io *io)
753 struct composite_context *c = NULL;
754 struct wreplsrv_service *service = io->in.partner->service;
755 struct wreplsrv_pull_cycle_state *state = NULL;
757 c = talloc_zero(mem_ctx, struct composite_context);
760 state = talloc_zero(c, struct wreplsrv_pull_cycle_state);
761 if (!state) goto failed;
765 c->state = COMPOSITE_STATE_IN_PROGRESS;
766 c->event_ctx = service->task->event_ctx;
767 c->private_data = state;
769 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY;
770 state->table_io.in.partner = io->in.partner;
771 state->table_io.in.num_owners = io->in.num_owners;
772 state->table_io.in.owners = io->in.owners;
773 state->creq = wreplsrv_pull_table_send(state, &state->table_io);
774 if (!state->creq) goto failed;
776 state->creq->async.fn = wreplsrv_pull_cycle_handler_creq;
777 state->creq->async.private_data = state;
785 NTSTATUS wreplsrv_pull_cycle_recv(struct composite_context *c)
789 status = composite_wait(c);
795 enum wreplsrv_push_notify_stage {
796 WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT,
797 WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM,
798 WREPLSRV_PUSH_NOTIFY_STAGE_DONE
801 struct wreplsrv_push_notify_state {
802 enum wreplsrv_push_notify_stage stage;
803 struct composite_context *c;
804 struct wreplsrv_push_notify_io *io;
805 enum wrepl_replication_cmd command;
807 struct wrepl_send_ctrl ctrl;
808 struct wrepl_request *req;
809 struct wrepl_packet req_packet;
810 struct wrepl_packet *rep_packet;
811 struct composite_context *creq;
812 struct wreplsrv_out_connection *wreplconn;
815 static void wreplsrv_push_notify_handler_creq(struct composite_context *creq);
816 static void wreplsrv_push_notify_handler_req(struct wrepl_request *req);
818 static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *state)
820 struct wreplsrv_service *service = state->io->in.partner->service;
821 struct wrepl_packet *req = &state->req_packet;
822 struct wrepl_replication *repl_out = &state->req_packet.message.replication;
823 struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
824 struct wreplsrv_in_connection *wrepl_in;
826 struct socket_context *sock;
827 struct packet_context *packet;
831 /* prepare the outgoing request */
832 req->opcode = WREPL_OPCODE_BITS;
833 req->assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
834 req->mess_type = WREPL_REPLICATION;
836 repl_out->command = state->command;
838 our_ip = socket_get_my_addr(state->wreplconn->sock->sock, state);
839 NT_STATUS_HAVE_NO_MEMORY(our_ip);
841 status = wreplsrv_fill_wrepl_table(service, state, table_out,
842 our_ip, state->full_table);
843 NT_STATUS_NOT_OK_RETURN(status);
845 /* queue the request */
846 state->req = wrepl_request_send(state->wreplconn->sock, req, NULL);
847 NT_STATUS_HAVE_NO_MEMORY(state->req);
850 * now we need to convert the wrepl_socket (client connection)
851 * into a wreplsrv_in_connection (server connection), because
852 * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
853 * message is received by the peer.
856 /* steal the socket_context */
857 sock = state->wreplconn->sock->sock;
858 state->wreplconn->sock->sock = NULL;
859 talloc_steal(state, sock);
862 * steal the packet_context
863 * note the request DATA_BLOB we just send on the
864 * wrepl_socket (client connection) is still unter the
865 * packet context and will be send to the wire
867 packet = state->wreplconn->sock->packet;
868 state->wreplconn->sock->packet = NULL;
869 talloc_steal(state, packet);
872 * get the fde_flags of the old fde event,
873 * so that we can later set the same flags to the new one
875 fde_flags = event_get_fd_flags(state->wreplconn->sock->event.fde);
878 * free the wrepl_socket (client connection)
880 talloc_free(state->wreplconn->sock);
881 state->wreplconn->sock = NULL;
884 * now create a wreplsrv_in_connection,
885 * on which we act as server
887 * NOTE: sock and packet will be stolen by
888 * wreplsrv_in_connection_merge()
890 status = wreplsrv_in_connection_merge(state->io->in.partner,
891 sock, packet, &wrepl_in);
892 NT_STATUS_NOT_OK_RETURN(status);
894 event_set_fd_flags(wrepl_in->conn->event.fde, fde_flags);
896 wrepl_in->assoc_ctx.peer_ctx = state->wreplconn->assoc_ctx.peer_ctx;
897 wrepl_in->assoc_ctx.our_ctx = 0;
899 /* now we can free the wreplsrv_out_connection */
900 talloc_free(state->wreplconn);
901 state->wreplconn = NULL;
903 state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
908 static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *state)
910 struct wreplsrv_service *service = state->io->in.partner->service;
911 struct wrepl_packet *req = &state->req_packet;
912 struct wrepl_replication *repl_out = &state->req_packet.message.replication;
913 struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
917 req->opcode = WREPL_OPCODE_BITS;
918 req->assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
919 req->mess_type = WREPL_REPLICATION;
921 repl_out->command = state->command;
923 our_ip = socket_get_my_addr(state->wreplconn->sock->sock, state);
924 NT_STATUS_HAVE_NO_MEMORY(our_ip);
926 status = wreplsrv_fill_wrepl_table(service, state, table_out,
927 our_ip, state->full_table);
928 NT_STATUS_NOT_OK_RETURN(status);
930 /* we won't get a reply to a inform message */
931 state->ctrl.send_only = True;
933 state->req = wrepl_request_send(state->wreplconn->sock, req, &state->ctrl);
934 NT_STATUS_HAVE_NO_MEMORY(state->req);
936 state->req->async.fn = wreplsrv_push_notify_handler_req;
937 state->req->async.private = state;
939 state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM;
944 static NTSTATUS wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state *state)
948 status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
949 NT_STATUS_NOT_OK_RETURN(status);
951 switch (state->command) {
952 case WREPL_REPL_UPDATE:
953 state->full_table = True;
954 return wreplsrv_push_notify_update(state);
955 case WREPL_REPL_UPDATE2:
956 state->full_table = False;
957 return wreplsrv_push_notify_update(state);
958 case WREPL_REPL_INFORM:
959 state->full_table = True;
960 return wreplsrv_push_notify_inform(state);
961 case WREPL_REPL_INFORM2:
962 state->full_table = False;
963 return wreplsrv_push_notify_inform(state);
965 return NT_STATUS_INTERNAL_ERROR;
968 return NT_STATUS_INTERNAL_ERROR;
971 static NTSTATUS wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state *state)
975 status = wrepl_request_recv(state->req, state, NULL);
976 NT_STATUS_NOT_OK_RETURN(status);
978 state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
982 static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state *state)
984 struct composite_context *c = state->c;
986 switch (state->stage) {
987 case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT:
988 c->status = wreplsrv_push_notify_wait_connect(state);
990 case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM:
991 c->status = wreplsrv_push_notify_wait_inform(state);
993 case WREPLSRV_PUSH_NOTIFY_STAGE_DONE:
994 c->status = NT_STATUS_INTERNAL_ERROR;
997 if (state->stage == WREPLSRV_PUSH_NOTIFY_STAGE_DONE) {
998 c->state = COMPOSITE_STATE_DONE;
1001 if (!NT_STATUS_IS_OK(c->status)) {
1002 c->state = COMPOSITE_STATE_ERROR;
1005 if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
1010 static void wreplsrv_push_notify_handler_creq(struct composite_context *creq)
1012 struct wreplsrv_push_notify_state *state = talloc_get_type(creq->async.private_data,
1013 struct wreplsrv_push_notify_state);
1014 wreplsrv_push_notify_handler(state);
1018 static void wreplsrv_push_notify_handler_req(struct wrepl_request *req)
1020 struct wreplsrv_push_notify_state *state = talloc_get_type(req->async.private,
1021 struct wreplsrv_push_notify_state);
1022 wreplsrv_push_notify_handler(state);
1026 struct composite_context *wreplsrv_push_notify_send(TALLOC_CTX *mem_ctx, struct wreplsrv_push_notify_io *io)
1028 struct composite_context *c = NULL;
1029 struct wreplsrv_service *service = io->in.partner->service;
1030 struct wreplsrv_push_notify_state *state = NULL;
1031 enum winsrepl_partner_type partner_type;
1033 c = talloc_zero(mem_ctx, struct composite_context);
1034 if (!c) goto failed;
1036 state = talloc_zero(c, struct wreplsrv_push_notify_state);
1037 if (!state) goto failed;
1041 if (io->in.inform) {
1042 /* we can cache the connection in partner->push->wreplconn */
1043 partner_type = WINSREPL_PARTNER_PUSH;
1044 if (io->in.propagate) {
1045 state->command = WREPL_REPL_INFORM2;
1047 state->command = WREPL_REPL_INFORM;
1050 /* we can NOT cache the connection */
1051 partner_type = WINSREPL_PARTNER_NONE;
1052 if (io->in.propagate) {
1053 state->command = WREPL_REPL_UPDATE2;
1055 state->command = WREPL_REPL_UPDATE;
1059 c->state = COMPOSITE_STATE_IN_PROGRESS;
1060 c->event_ctx = service->task->event_ctx;
1061 c->private_data = state;
1063 state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT;
1064 state->creq = wreplsrv_out_connect_send(io->in.partner, partner_type, NULL);
1065 if (!state->creq) goto failed;
1067 state->creq->async.fn = wreplsrv_push_notify_handler_creq;
1068 state->creq->async.private_data = state;
1076 NTSTATUS wreplsrv_push_notify_recv(struct composite_context *c)
1080 status = composite_wait(c);