2 Unix SMB/CIFS implementation.
4 WINS Replication server
6 Copyright (C) Stefan Metzmacher 2005
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 2 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 #include "lib/events/events.h"
25 #include "lib/socket/socket.h"
26 #include "smbd/service_task.h"
27 #include "smbd/service_stream.h"
28 #include "librpc/gen_ndr/ndr_winsrepl.h"
29 #include "wrepl_server/wrepl_server.h"
30 #include "wrepl_server/wrepl_out_helpers.h"
31 #include "libcli/composite/composite.h"
32 #include "libcli/wrepl/winsrepl.h"
34 enum wreplsrv_out_connect_stage {
35 WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET,
36 WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX,
37 WREPLSRV_OUT_CONNECT_STAGE_DONE
40 struct wreplsrv_out_connect_state {
41 enum wreplsrv_out_connect_stage stage;
42 struct composite_context *c;
43 struct wrepl_request *req;
44 struct composite_context *c_req;
45 struct wrepl_associate assoc_io;
46 enum winsrepl_partner_type type;
47 struct wreplsrv_out_connection *wreplconn;
50 static void wreplsrv_out_connect_handler_creq(struct composite_context *c_req);
51 static void wreplsrv_out_connect_handler_req(struct wrepl_request *req);
53 static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state)
57 status = wrepl_connect_recv(state->c_req);
58 NT_STATUS_NOT_OK_RETURN(status);
60 state->req = wrepl_associate_send(state->wreplconn->sock, &state->assoc_io);
61 NT_STATUS_HAVE_NO_MEMORY(state->req);
63 state->req->async.fn = wreplsrv_out_connect_handler_req;
64 state->req->async.private = state;
66 state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX;
71 static NTSTATUS wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state *state)
75 status = wrepl_associate_recv(state->req, &state->assoc_io);
76 NT_STATUS_NOT_OK_RETURN(status);
78 state->wreplconn->assoc_ctx.peer_ctx = state->assoc_io.out.assoc_ctx;
80 if (state->type == WINSREPL_PARTNER_PUSH) {
81 state->wreplconn->partner->push.wreplconn = state->wreplconn;
82 talloc_steal(state->wreplconn->partner, state->wreplconn);
83 } else if (state->type == WINSREPL_PARTNER_PULL) {
84 state->wreplconn->partner->pull.wreplconn = state->wreplconn;
85 talloc_steal(state->wreplconn->partner, state->wreplconn);
88 state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
93 static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state *state)
95 struct composite_context *c = state->c;
97 switch (state->stage) {
98 case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET:
99 c->status = wreplsrv_out_connect_wait_socket(state);
101 case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX:
102 c->status = wreplsrv_out_connect_wait_assoc_ctx(state);
103 c->state = COMPOSITE_STATE_DONE;
105 case WREPLSRV_OUT_CONNECT_STAGE_DONE:
106 c->status = NT_STATUS_INTERNAL_ERROR;
109 if (!NT_STATUS_IS_OK(c->status)) {
110 c->state = COMPOSITE_STATE_ERROR;
113 if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
118 static void wreplsrv_out_connect_handler_creq(struct composite_context *creq)
120 struct wreplsrv_out_connect_state *state = talloc_get_type(creq->async.private_data,
121 struct wreplsrv_out_connect_state);
122 wreplsrv_out_connect_handler(state);
126 static void wreplsrv_out_connect_handler_req(struct wrepl_request *req)
128 struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private,
129 struct wreplsrv_out_connect_state);
130 wreplsrv_out_connect_handler(state);
134 static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner,
135 enum winsrepl_partner_type type,
136 struct wreplsrv_out_connection *wreplconn)
138 struct composite_context *c = NULL;
139 struct wreplsrv_service *service = partner->service;
140 struct wreplsrv_out_connect_state *state = NULL;
141 struct wreplsrv_out_connection **wreplconnp = &wreplconn;
142 BOOL cached_connection = False;
144 c = talloc_zero(partner, struct composite_context);
147 state = talloc_zero(c, struct wreplsrv_out_connect_state);
148 if (!state) goto failed;
152 c->state = COMPOSITE_STATE_IN_PROGRESS;
153 c->event_ctx = service->task->event_ctx;
154 c->private_data = state;
156 if (type == WINSREPL_PARTNER_PUSH) {
157 cached_connection = True;
158 wreplconn = partner->push.wreplconn;
159 wreplconnp = &partner->push.wreplconn;
160 } else if (type == WINSREPL_PARTNER_PULL) {
161 cached_connection = True;
162 wreplconn = partner->pull.wreplconn;
163 wreplconnp = &partner->pull.wreplconn;
166 /* we have a connection already, so use it */
168 if (!wreplconn->sock->dead) {
169 state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
170 state->wreplconn= wreplconn;
173 } else if (!cached_connection) {
174 state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
175 state->wreplconn= NULL;
179 talloc_free(wreplconn);
184 wreplconn = talloc_zero(state, struct wreplsrv_out_connection);
185 if (!wreplconn) goto failed;
187 wreplconn->service = service;
188 wreplconn->partner = partner;
189 wreplconn->sock = wrepl_socket_init(wreplconn, service->task->event_ctx);
190 if (!wreplconn->sock) goto failed;
192 state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET;
193 state->wreplconn= wreplconn;
194 state->c_req = wrepl_connect_send(wreplconn->sock,
195 partner->our_address,
197 if (!state->c_req) goto failed;
199 state->c_req->async.fn = wreplsrv_out_connect_handler_creq;
200 state->c_req->async.private_data = state;
208 static NTSTATUS wreplsrv_out_connect_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
209 struct wreplsrv_out_connection **wreplconn)
213 status = composite_wait(c);
215 if (NT_STATUS_IS_OK(status)) {
216 struct wreplsrv_out_connect_state *state = talloc_get_type(c->private_data,
217 struct wreplsrv_out_connect_state);
218 if (state->wreplconn) {
219 *wreplconn = talloc_reference(mem_ctx, state->wreplconn);
220 if (!*wreplconn) status = NT_STATUS_NO_MEMORY;
222 status = NT_STATUS_INVALID_CONNECTION;
231 enum wreplsrv_pull_table_stage {
232 WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION,
233 WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY,
234 WREPLSRV_PULL_TABLE_STAGE_DONE
237 struct wreplsrv_pull_table_state {
238 enum wreplsrv_pull_table_stage stage;
239 struct composite_context *c;
240 struct wrepl_request *req;
241 struct wrepl_pull_table table_io;
242 struct wreplsrv_pull_table_io *io;
243 struct composite_context *creq;
244 struct wreplsrv_out_connection *wreplconn;
247 static void wreplsrv_pull_table_handler_req(struct wrepl_request *req);
249 static NTSTATUS wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state *state)
253 status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
254 NT_STATUS_NOT_OK_RETURN(status);
256 state->table_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
257 state->req = wrepl_pull_table_send(state->wreplconn->sock, &state->table_io);
258 NT_STATUS_HAVE_NO_MEMORY(state->req);
260 state->req->async.fn = wreplsrv_pull_table_handler_req;
261 state->req->async.private = state;
263 state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY;
268 static NTSTATUS wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state *state)
272 status = wrepl_pull_table_recv(state->req, state, &state->table_io);
273 NT_STATUS_NOT_OK_RETURN(status);
275 state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
280 static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state *state)
282 struct composite_context *c = state->c;
284 switch (state->stage) {
285 case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION:
286 c->status = wreplsrv_pull_table_wait_connection(state);
288 case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY:
289 c->status = wreplsrv_pull_table_wait_table_reply(state);
290 c->state = COMPOSITE_STATE_DONE;
292 case WREPLSRV_PULL_TABLE_STAGE_DONE:
293 c->status = NT_STATUS_INTERNAL_ERROR;
296 if (!NT_STATUS_IS_OK(c->status)) {
297 c->state = COMPOSITE_STATE_ERROR;
300 if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
305 static void wreplsrv_pull_table_handler_creq(struct composite_context *creq)
307 struct wreplsrv_pull_table_state *state = talloc_get_type(creq->async.private_data,
308 struct wreplsrv_pull_table_state);
309 wreplsrv_pull_table_handler(state);
313 static void wreplsrv_pull_table_handler_req(struct wrepl_request *req)
315 struct wreplsrv_pull_table_state *state = talloc_get_type(req->async.private,
316 struct wreplsrv_pull_table_state);
317 wreplsrv_pull_table_handler(state);
321 struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io)
323 struct composite_context *c = NULL;
324 struct wreplsrv_service *service = io->in.partner->service;
325 struct wreplsrv_pull_table_state *state = NULL;
327 c = talloc_zero(mem_ctx, struct composite_context);
330 state = talloc_zero(c, struct wreplsrv_pull_table_state);
331 if (!state) goto failed;
335 c->state = COMPOSITE_STATE_IN_PROGRESS;
336 c->event_ctx = service->task->event_ctx;
337 c->private_data = state;
339 if (io->in.num_owners) {
340 state->table_io.out.num_partners = io->in.num_owners;
341 state->table_io.out.partners = io->in.owners;
342 state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
347 state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION;
348 state->creq = wreplsrv_out_connect_send(io->in.partner, WINSREPL_PARTNER_PULL, NULL);
349 if (!state->creq) goto failed;
351 state->creq->async.fn = wreplsrv_pull_table_handler_creq;
352 state->creq->async.private_data = state;
360 NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
361 struct wreplsrv_pull_table_io *io)
365 status = composite_wait(c);
367 if (NT_STATUS_IS_OK(status)) {
368 struct wreplsrv_pull_table_state *state = talloc_get_type(c->private_data,
369 struct wreplsrv_pull_table_state);
370 io->out.num_owners = state->table_io.out.num_partners;
371 io->out.owners = state->table_io.out.partners;
372 talloc_reference(mem_ctx, state->table_io.out.partners);
379 enum wreplsrv_pull_names_stage {
380 WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION,
381 WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY,
382 WREPLSRV_PULL_NAMES_STAGE_DONE
385 struct wreplsrv_pull_names_state {
386 enum wreplsrv_pull_names_stage stage;
387 struct composite_context *c;
388 struct wrepl_request *req;
389 struct wrepl_pull_names pull_io;
390 struct wreplsrv_pull_names_io *io;
391 struct composite_context *creq;
392 struct wreplsrv_out_connection *wreplconn;
395 static void wreplsrv_pull_names_handler_req(struct wrepl_request *req);
397 static NTSTATUS wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state *state)
401 status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
402 NT_STATUS_NOT_OK_RETURN(status);
404 state->pull_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
405 state->pull_io.in.partner = state->io->in.owner;
406 state->req = wrepl_pull_names_send(state->wreplconn->sock, &state->pull_io);
407 NT_STATUS_HAVE_NO_MEMORY(state->req);
409 state->req->async.fn = wreplsrv_pull_names_handler_req;
410 state->req->async.private = state;
412 state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY;
417 static NTSTATUS wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state *state)
421 status = wrepl_pull_names_recv(state->req, state, &state->pull_io);
422 NT_STATUS_NOT_OK_RETURN(status);
424 state->stage = WREPLSRV_PULL_NAMES_STAGE_DONE;
429 static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state *state)
431 struct composite_context *c = state->c;
433 switch (state->stage) {
434 case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION:
435 c->status = wreplsrv_pull_names_wait_connection(state);
437 case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY:
438 c->status = wreplsrv_pull_names_wait_send_reply(state);
439 c->state = COMPOSITE_STATE_DONE;
441 case WREPLSRV_PULL_NAMES_STAGE_DONE:
442 c->status = NT_STATUS_INTERNAL_ERROR;
445 if (!NT_STATUS_IS_OK(c->status)) {
446 c->state = COMPOSITE_STATE_ERROR;
449 if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
454 static void wreplsrv_pull_names_handler_creq(struct composite_context *creq)
456 struct wreplsrv_pull_names_state *state = talloc_get_type(creq->async.private_data,
457 struct wreplsrv_pull_names_state);
458 wreplsrv_pull_names_handler(state);
462 static void wreplsrv_pull_names_handler_req(struct wrepl_request *req)
464 struct wreplsrv_pull_names_state *state = talloc_get_type(req->async.private,
465 struct wreplsrv_pull_names_state);
466 wreplsrv_pull_names_handler(state);
470 struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io)
472 struct composite_context *c = NULL;
473 struct wreplsrv_service *service = io->in.partner->service;
474 struct wreplsrv_pull_names_state *state = NULL;
475 enum winsrepl_partner_type partner_type = WINSREPL_PARTNER_PULL;
477 if (io->in.wreplconn) partner_type = WINSREPL_PARTNER_NONE;
479 c = talloc_zero(mem_ctx, struct composite_context);
482 state = talloc_zero(c, struct wreplsrv_pull_names_state);
483 if (!state) goto failed;
487 c->state = COMPOSITE_STATE_IN_PROGRESS;
488 c->event_ctx = service->task->event_ctx;
489 c->private_data = state;
491 state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION;
492 state->creq = wreplsrv_out_connect_send(io->in.partner, partner_type, io->in.wreplconn);
493 if (!state->creq) goto failed;
495 state->creq->async.fn = wreplsrv_pull_names_handler_creq;
496 state->creq->async.private_data = state;
504 NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
505 struct wreplsrv_pull_names_io *io)
509 status = composite_wait(c);
511 if (NT_STATUS_IS_OK(status)) {
512 struct wreplsrv_pull_names_state *state = talloc_get_type(c->private_data,
513 struct wreplsrv_pull_names_state);
514 io->out.num_names = state->pull_io.out.num_names;
515 io->out.names = state->pull_io.out.names;
516 talloc_reference(mem_ctx, state->pull_io.out.names);
524 enum wreplsrv_pull_cycle_stage {
525 WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY,
526 WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES,
527 WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC,
528 WREPLSRV_PULL_CYCLE_STAGE_DONE
531 struct wreplsrv_pull_cycle_state {
532 enum wreplsrv_pull_cycle_stage stage;
533 struct composite_context *c;
534 struct wreplsrv_pull_cycle_io *io;
535 struct wreplsrv_pull_table_io table_io;
537 struct wreplsrv_pull_names_io names_io;
538 struct composite_context *creq;
539 struct wrepl_associate_stop assoc_stop_io;
540 struct wrepl_request *req;
543 static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq);
544 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req);
546 static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state *state)
548 struct wreplsrv_owner *current_owner;
549 struct wreplsrv_owner *local_owner;
551 uint64_t old_max_version = 0;
552 BOOL do_pull = False;
554 for (i=state->current; i < state->table_io.out.num_owners; i++) {
555 current_owner = wreplsrv_find_owner(state->io->in.partner->pull.table,
556 state->table_io.out.owners[i].address);
558 local_owner = wreplsrv_find_owner(state->io->in.partner->service->table,
559 state->table_io.out.owners[i].address);
561 * this means we are ourself the current owner,
562 * and we don't want replicate ourself
564 if (!current_owner) continue;
567 * this means we don't have any records of this owner
577 * this means the remote partner has some new records of this owner
580 if (current_owner->owner.max_version > local_owner->owner.max_version) {
582 old_max_version = local_owner->owner.max_version;
589 state->names_io.in.partner = state->io->in.partner;
590 state->names_io.in.wreplconn = state->io->in.wreplconn;
591 state->names_io.in.owner = current_owner->owner;
592 state->names_io.in.owner.min_version = old_max_version + 1;
593 state->creq = wreplsrv_pull_names_send(state, &state->names_io);
594 NT_STATUS_HAVE_NO_MEMORY(state->creq);
596 state->creq->async.fn = wreplsrv_pull_cycle_handler_creq;
597 state->creq->async.private_data = state;
599 return STATUS_MORE_ENTRIES;
605 static NTSTATUS wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state *state)
609 status = wreplsrv_pull_cycle_next_owner_do_work(state);
610 if (NT_STATUS_IS_OK(status)) {
611 state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
612 } else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES, status)) {
613 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES;
614 status = NT_STATUS_OK;
617 if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE && state->io->in.wreplconn) {
618 state->assoc_stop_io.in.assoc_ctx = state->io->in.wreplconn->assoc_ctx.peer_ctx;
619 state->assoc_stop_io.in.reason = 0;
620 state->req = wrepl_associate_stop_send(state->io->in.wreplconn->sock, &state->assoc_stop_io);
621 NT_STATUS_HAVE_NO_MEMORY(state->req);
623 state->req->async.fn = wreplsrv_pull_cycle_handler_req;
624 state->req->async.private = state;
626 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC;
632 static NTSTATUS wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state *state)
637 status = wreplsrv_pull_table_recv(state->creq, state, &state->table_io);
638 NT_STATUS_NOT_OK_RETURN(status);
640 /* update partner table */
641 for (i=0; i < state->table_io.out.num_owners; i++) {
644 is_our_addr = wreplsrv_is_our_address(state->io->in.partner->service,
645 state->table_io.out.owners[i].address);
646 if (is_our_addr) continue;
648 status = wreplsrv_add_table(state->io->in.partner->service,
649 state->io->in.partner,
650 &state->io->in.partner->pull.table,
651 state->table_io.out.owners[i].address,
652 state->table_io.out.owners[i].max_version);
653 NT_STATUS_NOT_OK_RETURN(status);
656 status = wreplsrv_pull_cycle_next_owner_wrapper(state);
657 NT_STATUS_NOT_OK_RETURN(status);
662 static NTSTATUS wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state *state)
666 status = wreplsrv_apply_records(state->io->in.partner, &state->names_io);
667 NT_STATUS_NOT_OK_RETURN(status);
669 talloc_free(state->names_io.out.names);
670 ZERO_STRUCT(state->names_io);
675 static NTSTATUS wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state *state)
679 status = wreplsrv_pull_names_recv(state->creq, state, &state->names_io);
680 NT_STATUS_NOT_OK_RETURN(status);
683 * TODO: this should maybe an async call,
684 * because we may need some network access
685 * for conflict resolving
687 status = wreplsrv_pull_cycle_apply_records(state);
688 NT_STATUS_NOT_OK_RETURN(status);
690 status = wreplsrv_pull_cycle_next_owner_wrapper(state);
691 NT_STATUS_NOT_OK_RETURN(status);
696 static NTSTATUS wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state *state)
700 status = wrepl_associate_stop_recv(state->req, &state->assoc_stop_io);
701 NT_STATUS_NOT_OK_RETURN(status);
703 state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
708 static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state *state)
710 struct composite_context *c = state->c;
712 switch (state->stage) {
713 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY:
714 c->status = wreplsrv_pull_cycle_wait_table_reply(state);
716 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES:
717 c->status = wreplsrv_pull_cycle_wait_send_replies(state);
719 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC:
720 c->status = wreplsrv_pull_cycle_wait_stop_assoc(state);
722 case WREPLSRV_PULL_CYCLE_STAGE_DONE:
723 c->status = NT_STATUS_INTERNAL_ERROR;
726 if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE) {
727 c->state = COMPOSITE_STATE_DONE;
730 if (!NT_STATUS_IS_OK(c->status)) {
731 c->state = COMPOSITE_STATE_ERROR;
734 if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
739 static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq)
741 struct wreplsrv_pull_cycle_state *state = talloc_get_type(creq->async.private_data,
742 struct wreplsrv_pull_cycle_state);
743 wreplsrv_pull_cycle_handler(state);
747 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req)
749 struct wreplsrv_pull_cycle_state *state = talloc_get_type(req->async.private,
750 struct wreplsrv_pull_cycle_state);
751 wreplsrv_pull_cycle_handler(state);
755 struct composite_context *wreplsrv_pull_cycle_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_cycle_io *io)
757 struct composite_context *c = NULL;
758 struct wreplsrv_service *service = io->in.partner->service;
759 struct wreplsrv_pull_cycle_state *state = NULL;
761 c = talloc_zero(mem_ctx, struct composite_context);
764 state = talloc_zero(c, struct wreplsrv_pull_cycle_state);
765 if (!state) goto failed;
769 c->state = COMPOSITE_STATE_IN_PROGRESS;
770 c->event_ctx = service->task->event_ctx;
771 c->private_data = state;
773 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY;
774 state->table_io.in.partner = io->in.partner;
775 state->table_io.in.num_owners = io->in.num_owners;
776 state->table_io.in.owners = io->in.owners;
777 state->creq = wreplsrv_pull_table_send(state, &state->table_io);
778 if (!state->creq) goto failed;
780 state->creq->async.fn = wreplsrv_pull_cycle_handler_creq;
781 state->creq->async.private_data = state;
789 NTSTATUS wreplsrv_pull_cycle_recv(struct composite_context *c)
793 status = composite_wait(c);
799 enum wreplsrv_push_notify_stage {
800 WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT,
801 WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM,
802 WREPLSRV_PUSH_NOTIFY_STAGE_DONE
805 struct wreplsrv_push_notify_state {
806 enum wreplsrv_push_notify_stage stage;
807 struct composite_context *c;
808 struct wreplsrv_push_notify_io *io;
809 enum wrepl_replication_cmd command;
811 struct wrepl_send_ctrl ctrl;
812 struct wrepl_request *req;
813 struct wrepl_packet req_packet;
814 struct wrepl_packet *rep_packet;
815 struct composite_context *creq;
816 struct wreplsrv_out_connection *wreplconn;
819 static void wreplsrv_push_notify_handler_creq(struct composite_context *creq);
820 static void wreplsrv_push_notify_handler_req(struct wrepl_request *req);
822 static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *state)
824 struct wreplsrv_service *service = state->io->in.partner->service;
825 struct wrepl_packet *req = &state->req_packet;
826 struct wrepl_replication *repl_out = &state->req_packet.message.replication;
827 struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
828 struct wreplsrv_in_connection *wrepl_in;
830 struct socket_context *sock;
831 struct packet_context *packet;
835 /* prepare the outgoing request */
836 req->opcode = WREPL_OPCODE_BITS;
837 req->assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
838 req->mess_type = WREPL_REPLICATION;
840 repl_out->command = state->command;
842 our_ip = socket_get_my_addr(state->wreplconn->sock->sock, state);
843 NT_STATUS_HAVE_NO_MEMORY(our_ip);
845 status = wreplsrv_fill_wrepl_table(service, state, table_out,
846 our_ip, our_ip, state->full_table);
847 NT_STATUS_NOT_OK_RETURN(status);
849 /* queue the request */
850 state->req = wrepl_request_send(state->wreplconn->sock, req, NULL);
851 NT_STATUS_HAVE_NO_MEMORY(state->req);
854 * now we need to convert the wrepl_socket (client connection)
855 * into a wreplsrv_in_connection (server connection), because
856 * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
857 * message is received by the peer.
860 /* steal the socket_context */
861 sock = state->wreplconn->sock->sock;
862 state->wreplconn->sock->sock = NULL;
863 talloc_steal(state, sock);
866 * steal the packet_context
867 * note the request DATA_BLOB we just send on the
868 * wrepl_socket (client connection) is still unter the
869 * packet context and will be send to the wire
871 packet = state->wreplconn->sock->packet;
872 state->wreplconn->sock->packet = NULL;
873 talloc_steal(state, packet);
876 * get the fde_flags of the old fde event,
877 * so that we can later set the same flags to the new one
879 fde_flags = event_get_fd_flags(state->wreplconn->sock->event.fde);
882 * free the wrepl_socket (client connection)
884 talloc_free(state->wreplconn->sock);
885 state->wreplconn->sock = NULL;
888 * now create a wreplsrv_in_connection,
889 * on which we act as server
891 * NOTE: sock and packet will be stolen by
892 * wreplsrv_in_connection_merge()
894 status = wreplsrv_in_connection_merge(state->io->in.partner,
895 sock, packet, &wrepl_in);
896 NT_STATUS_NOT_OK_RETURN(status);
898 event_set_fd_flags(wrepl_in->conn->event.fde, fde_flags);
900 wrepl_in->assoc_ctx.peer_ctx = state->wreplconn->assoc_ctx.peer_ctx;
901 wrepl_in->assoc_ctx.our_ctx = 0;
903 /* now we can free the wreplsrv_out_connection */
904 talloc_free(state->wreplconn);
905 state->wreplconn = NULL;
907 state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
912 static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *state)
914 struct wreplsrv_service *service = state->io->in.partner->service;
915 struct wrepl_packet *req = &state->req_packet;
916 struct wrepl_replication *repl_out = &state->req_packet.message.replication;
917 struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
921 req->opcode = WREPL_OPCODE_BITS;
922 req->assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
923 req->mess_type = WREPL_REPLICATION;
925 repl_out->command = state->command;
927 our_ip = socket_get_my_addr(state->wreplconn->sock->sock, state);
928 NT_STATUS_HAVE_NO_MEMORY(our_ip);
930 status = wreplsrv_fill_wrepl_table(service, state, table_out,
931 our_ip, our_ip, state->full_table);
932 NT_STATUS_NOT_OK_RETURN(status);
934 /* we won't get a reply to a inform message */
935 state->ctrl.send_only = True;
937 state->req = wrepl_request_send(state->wreplconn->sock, req, &state->ctrl);
938 NT_STATUS_HAVE_NO_MEMORY(state->req);
940 state->req->async.fn = wreplsrv_push_notify_handler_req;
941 state->req->async.private = state;
943 state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM;
948 static NTSTATUS wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state *state)
952 status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
953 NT_STATUS_NOT_OK_RETURN(status);
955 switch (state->command) {
956 case WREPL_REPL_UPDATE:
957 state->full_table = True;
958 return wreplsrv_push_notify_update(state);
959 case WREPL_REPL_UPDATE2:
960 state->full_table = False;
961 return wreplsrv_push_notify_update(state);
962 case WREPL_REPL_INFORM:
963 state->full_table = True;
964 return wreplsrv_push_notify_inform(state);
965 case WREPL_REPL_INFORM2:
966 state->full_table = False;
967 return wreplsrv_push_notify_inform(state);
969 return NT_STATUS_INTERNAL_ERROR;
972 return NT_STATUS_INTERNAL_ERROR;
975 static NTSTATUS wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state *state)
979 status = wrepl_request_recv(state->req, state, NULL);
980 NT_STATUS_NOT_OK_RETURN(status);
982 state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
986 static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state *state)
988 struct composite_context *c = state->c;
990 switch (state->stage) {
991 case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT:
992 c->status = wreplsrv_push_notify_wait_connect(state);
994 case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM:
995 c->status = wreplsrv_push_notify_wait_inform(state);
997 case WREPLSRV_PUSH_NOTIFY_STAGE_DONE:
998 c->status = NT_STATUS_INTERNAL_ERROR;
1001 if (state->stage == WREPLSRV_PUSH_NOTIFY_STAGE_DONE) {
1002 c->state = COMPOSITE_STATE_DONE;
1005 if (!NT_STATUS_IS_OK(c->status)) {
1006 c->state = COMPOSITE_STATE_ERROR;
1009 if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
1014 static void wreplsrv_push_notify_handler_creq(struct composite_context *creq)
1016 struct wreplsrv_push_notify_state *state = talloc_get_type(creq->async.private_data,
1017 struct wreplsrv_push_notify_state);
1018 wreplsrv_push_notify_handler(state);
1022 static void wreplsrv_push_notify_handler_req(struct wrepl_request *req)
1024 struct wreplsrv_push_notify_state *state = talloc_get_type(req->async.private,
1025 struct wreplsrv_push_notify_state);
1026 wreplsrv_push_notify_handler(state);
1030 struct composite_context *wreplsrv_push_notify_send(TALLOC_CTX *mem_ctx, struct wreplsrv_push_notify_io *io)
1032 struct composite_context *c = NULL;
1033 struct wreplsrv_service *service = io->in.partner->service;
1034 struct wreplsrv_push_notify_state *state = NULL;
1035 enum winsrepl_partner_type partner_type;
1037 c = talloc_zero(mem_ctx, struct composite_context);
1038 if (!c) goto failed;
1040 state = talloc_zero(c, struct wreplsrv_push_notify_state);
1041 if (!state) goto failed;
1045 if (io->in.inform) {
1046 /* we can cache the connection in partner->push->wreplconn */
1047 partner_type = WINSREPL_PARTNER_PUSH;
1048 if (io->in.propagate) {
1049 state->command = WREPL_REPL_INFORM2;
1051 state->command = WREPL_REPL_INFORM;
1054 /* we can NOT cache the connection */
1055 partner_type = WINSREPL_PARTNER_NONE;
1056 if (io->in.propagate) {
1057 state->command = WREPL_REPL_UPDATE2;
1059 state->command = WREPL_REPL_UPDATE;
1063 c->state = COMPOSITE_STATE_IN_PROGRESS;
1064 c->event_ctx = service->task->event_ctx;
1065 c->private_data = state;
1067 state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT;
1068 state->creq = wreplsrv_out_connect_send(io->in.partner, partner_type, NULL);
1069 if (!state->creq) goto failed;
1071 state->creq->async.fn = wreplsrv_push_notify_handler_creq;
1072 state->creq->async.private_data = state;
1080 NTSTATUS wreplsrv_push_notify_recv(struct composite_context *c)
1084 status = composite_wait(c);