r12241: fix the inform push notifies
[bbaumbach/samba-autobuild/.git] / source4 / wrepl_server / wrepl_out_helpers.c
1 /* 
2    Unix SMB/CIFS implementation.
3    
4    WINS Replication server
5    
6    Copyright (C) Stefan Metzmacher      2005
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 2 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 #include "includes.h"
24 #include "dlinklist.h"
25 #include "lib/events/events.h"
26 #include "lib/socket/socket.h"
27 #include "smbd/service_task.h"
28 #include "smbd/service_stream.h"
29 #include "lib/messaging/irpc.h"
30 #include "librpc/gen_ndr/ndr_winsrepl.h"
31 #include "wrepl_server/wrepl_server.h"
32 #include "wrepl_server/wrepl_out_helpers.h"
33 #include "nbt_server/wins/winsdb.h"
34 #include "ldb/include/ldb.h"
35 #include "libcli/composite/composite.h"
36 #include "libcli/wrepl/winsrepl.h"
37
38 enum wreplsrv_out_connect_stage {
39         WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET,
40         WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX,
41         WREPLSRV_OUT_CONNECT_STAGE_DONE
42 };
43
44 struct wreplsrv_out_connect_state {
45         enum wreplsrv_out_connect_stage stage;
46         struct composite_context *c;
47         struct wrepl_request *req;
48         struct composite_context *c_req;
49         struct wrepl_associate assoc_io;
50         enum winsrepl_partner_type type;
51         struct wreplsrv_out_connection *wreplconn;
52 };
53
54 static void wreplsrv_out_connect_handler_creq(struct composite_context *c_req);
55 static void wreplsrv_out_connect_handler_req(struct wrepl_request *req);
56
57 static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state)
58 {
59         NTSTATUS status;
60
61         status = wrepl_connect_recv(state->c_req);
62         NT_STATUS_NOT_OK_RETURN(status);
63
64         state->req = wrepl_associate_send(state->wreplconn->sock, &state->assoc_io);
65         NT_STATUS_HAVE_NO_MEMORY(state->req);
66
67         state->req->async.fn            = wreplsrv_out_connect_handler_req;
68         state->req->async.private       = state;
69
70         state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX;
71
72         return NT_STATUS_OK;
73 }
74
75 static NTSTATUS wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state *state)
76 {
77         NTSTATUS status;
78
79         status = wrepl_associate_recv(state->req, &state->assoc_io);
80         NT_STATUS_NOT_OK_RETURN(status);
81
82         state->wreplconn->assoc_ctx.peer_ctx = state->assoc_io.out.assoc_ctx;
83
84         if (state->type == WINSREPL_PARTNER_PUSH) {
85                 state->wreplconn->partner->push.wreplconn = state->wreplconn;
86                 talloc_steal(state->wreplconn->partner, state->wreplconn);
87         } else if (state->type == WINSREPL_PARTNER_PULL) {
88                 state->wreplconn->partner->pull.wreplconn = state->wreplconn;
89                 talloc_steal(state->wreplconn->partner, state->wreplconn);
90         }
91
92         state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
93
94         return NT_STATUS_OK;
95 }
96
97 static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state *state)
98 {
99         struct composite_context *c = state->c;
100
101         switch (state->stage) {
102         case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET:
103                 c->status = wreplsrv_out_connect_wait_socket(state);
104                 break;
105         case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX:
106                 c->status = wreplsrv_out_connect_wait_assoc_ctx(state);
107                 c->state  = COMPOSITE_STATE_DONE;
108                 break;
109         case WREPLSRV_OUT_CONNECT_STAGE_DONE:
110                 c->status = NT_STATUS_INTERNAL_ERROR;
111         }
112
113         if (!NT_STATUS_IS_OK(c->status)) {
114                 c->state = COMPOSITE_STATE_ERROR;
115         }
116
117         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
118                 c->async.fn(c);
119         }
120 }
121
122 static void wreplsrv_out_connect_handler_creq(struct composite_context *creq)
123 {
124         struct wreplsrv_out_connect_state *state = talloc_get_type(creq->async.private_data,
125                                                    struct wreplsrv_out_connect_state);
126         wreplsrv_out_connect_handler(state);
127         return;
128 }
129
130 static void wreplsrv_out_connect_handler_req(struct wrepl_request *req)
131 {
132         struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private,
133                                                    struct wreplsrv_out_connect_state);
134         wreplsrv_out_connect_handler(state);
135         return;
136 }
137
138 static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner,
139                                                            enum winsrepl_partner_type type,
140                                                            struct wreplsrv_out_connection *wreplconn)
141 {
142         struct composite_context *c = NULL;
143         struct wreplsrv_service *service = partner->service;
144         struct wreplsrv_out_connect_state *state = NULL;
145         struct wreplsrv_out_connection **wreplconnp = &wreplconn;
146         BOOL cached_connection = False;
147
148         c = talloc_zero(partner, struct composite_context);
149         if (!c) goto failed;
150
151         state = talloc_zero(c, struct wreplsrv_out_connect_state);
152         if (!state) goto failed;
153         state->c        = c;
154         state->type     = type;
155
156         c->state        = COMPOSITE_STATE_IN_PROGRESS;
157         c->event_ctx    = service->task->event_ctx;
158         c->private_data = state;
159
160         if (type == WINSREPL_PARTNER_PUSH) {
161                 cached_connection       = True;
162                 wreplconn               = partner->push.wreplconn;
163                 wreplconnp              = &partner->push.wreplconn;
164         } else if (type == WINSREPL_PARTNER_PULL) {
165                 cached_connection       = True;
166                 wreplconn               = partner->pull.wreplconn;
167                 wreplconnp              = &partner->pull.wreplconn;
168         }
169
170         /* we have a connection already, so use it */
171         if (wreplconn) {
172                 if (!wreplconn->sock->dead) {
173                         state->stage    = WREPLSRV_OUT_CONNECT_STAGE_DONE;
174                         state->wreplconn= wreplconn;
175                         composite_done(c);
176                         return c;
177                 } else if (!cached_connection) {
178                         state->stage    = WREPLSRV_OUT_CONNECT_STAGE_DONE;
179                         state->wreplconn= NULL;
180                         composite_done(c);
181                         return c;
182                 } else {
183                         talloc_free(wreplconn);
184                         *wreplconnp = NULL;
185                 }
186         }
187
188         wreplconn = talloc_zero(state, struct wreplsrv_out_connection);
189         if (!wreplconn) goto failed;
190
191         wreplconn->service      = service;
192         wreplconn->partner      = partner;
193         wreplconn->sock         = wrepl_socket_init(wreplconn, service->task->event_ctx);
194         if (!wreplconn->sock) goto failed;
195
196         state->stage    = WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET;
197         state->wreplconn= wreplconn;
198         state->c_req    = wrepl_connect_send(wreplconn->sock,
199                                              partner->our_address,
200                                              partner->address);
201         if (!state->c_req) goto failed;
202
203         state->c_req->async.fn                  = wreplsrv_out_connect_handler_creq;
204         state->c_req->async.private_data        = state;
205
206         return c;
207 failed:
208         talloc_free(c);
209         return NULL;
210 }
211
212 static NTSTATUS wreplsrv_out_connect_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
213                                           struct wreplsrv_out_connection **wreplconn)
214 {
215         NTSTATUS status;
216
217         status = composite_wait(c);
218
219         if (NT_STATUS_IS_OK(status)) {
220                 struct wreplsrv_out_connect_state *state = talloc_get_type(c->private_data,
221                                                            struct wreplsrv_out_connect_state);
222                 if (state->wreplconn) {
223                         *wreplconn = talloc_reference(mem_ctx, state->wreplconn);
224                         if (!*wreplconn) status = NT_STATUS_NO_MEMORY;
225                 } else {
226                         status = NT_STATUS_INVALID_CONNECTION;
227                 }
228         }
229
230         talloc_free(c);
231         return status;
232         
233 }
234
235 enum wreplsrv_pull_table_stage {
236         WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION,
237         WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY,
238         WREPLSRV_PULL_TABLE_STAGE_DONE
239 };
240
241 struct wreplsrv_pull_table_state {
242         enum wreplsrv_pull_table_stage stage;
243         struct composite_context *c;
244         struct wrepl_request *req;
245         struct wrepl_pull_table table_io;
246         struct wreplsrv_pull_table_io *io;
247         struct composite_context *creq;
248         struct wreplsrv_out_connection *wreplconn;
249 };
250
251 static void wreplsrv_pull_table_handler_req(struct wrepl_request *req);
252
253 static NTSTATUS wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state *state)
254 {
255         NTSTATUS status;
256
257         status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
258         NT_STATUS_NOT_OK_RETURN(status);
259
260         state->table_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
261         state->req = wrepl_pull_table_send(state->wreplconn->sock, &state->table_io);
262         NT_STATUS_HAVE_NO_MEMORY(state->req);
263
264         state->req->async.fn            = wreplsrv_pull_table_handler_req;
265         state->req->async.private       = state;
266
267         state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY;
268
269         return NT_STATUS_OK;
270 }
271
272 static NTSTATUS wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state *state)
273 {
274         NTSTATUS status;
275
276         status = wrepl_pull_table_recv(state->req, state, &state->table_io);
277         NT_STATUS_NOT_OK_RETURN(status);
278
279         state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
280
281         return NT_STATUS_OK;
282 }
283
284 static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state *state)
285 {
286         struct composite_context *c = state->c;
287
288         switch (state->stage) {
289         case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION:
290                 c->status = wreplsrv_pull_table_wait_connection(state);
291                 break;
292         case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY:
293                 c->status = wreplsrv_pull_table_wait_table_reply(state);
294                 c->state  = COMPOSITE_STATE_DONE;
295                 break;
296         case WREPLSRV_PULL_TABLE_STAGE_DONE:
297                 c->status = NT_STATUS_INTERNAL_ERROR;
298         }
299
300         if (!NT_STATUS_IS_OK(c->status)) {
301                 c->state = COMPOSITE_STATE_ERROR;
302         }
303
304         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
305                 c->async.fn(c);
306         }
307 }
308
309 static void wreplsrv_pull_table_handler_creq(struct composite_context *creq)
310 {
311         struct wreplsrv_pull_table_state *state = talloc_get_type(creq->async.private_data,
312                                                   struct wreplsrv_pull_table_state);
313         wreplsrv_pull_table_handler(state);
314         return;
315 }
316
317 static void wreplsrv_pull_table_handler_req(struct wrepl_request *req)
318 {
319         struct wreplsrv_pull_table_state *state = talloc_get_type(req->async.private,
320                                                   struct wreplsrv_pull_table_state);
321         wreplsrv_pull_table_handler(state);
322         return;
323 }
324
325 struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io)
326 {
327         struct composite_context *c = NULL;
328         struct wreplsrv_service *service = io->in.partner->service;
329         struct wreplsrv_pull_table_state *state = NULL;
330
331         c = talloc_zero(mem_ctx, struct composite_context);
332         if (!c) goto failed;
333
334         state = talloc_zero(c, struct wreplsrv_pull_table_state);
335         if (!state) goto failed;
336         state->c        = c;
337         state->io       = io;
338
339         c->state        = COMPOSITE_STATE_IN_PROGRESS;
340         c->event_ctx    = service->task->event_ctx;
341         c->private_data = state;
342
343         if (io->in.num_owners) {
344                 state->table_io.out.num_partners        = io->in.num_owners;
345                 state->table_io.out.partners            = io->in.owners;
346                 state->stage                            = WREPLSRV_PULL_TABLE_STAGE_DONE;
347                 composite_done(c);
348                 return c;
349         }
350
351         state->stage    = WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION;
352         state->creq     = wreplsrv_out_connect_send(io->in.partner, WINSREPL_PARTNER_PULL, NULL);
353         if (!state->creq) goto failed;
354
355         state->creq->async.fn           = wreplsrv_pull_table_handler_creq;
356         state->creq->async.private_data = state;
357
358         return c;
359 failed:
360         talloc_free(c);
361         return NULL;
362 }
363
364 NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
365                                   struct wreplsrv_pull_table_io *io)
366 {
367         NTSTATUS status;
368
369         status = composite_wait(c);
370
371         if (NT_STATUS_IS_OK(status)) {
372                 struct wreplsrv_pull_table_state *state = talloc_get_type(c->private_data,
373                                                           struct wreplsrv_pull_table_state);
374                 io->out.num_owners      = state->table_io.out.num_partners;
375                 io->out.owners          = state->table_io.out.partners;
376                 talloc_reference(mem_ctx, state->table_io.out.partners);
377         }
378
379         talloc_free(c);
380         return status;  
381 }
382
383 enum wreplsrv_pull_names_stage {
384         WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION,
385         WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY,
386         WREPLSRV_PULL_NAMES_STAGE_DONE
387 };
388
389 struct wreplsrv_pull_names_state {
390         enum wreplsrv_pull_names_stage stage;
391         struct composite_context *c;
392         struct wrepl_request *req;
393         struct wrepl_pull_names pull_io;
394         struct wreplsrv_pull_names_io *io;
395         struct composite_context *creq;
396         struct wreplsrv_out_connection *wreplconn;
397 };
398
399 static void wreplsrv_pull_names_handler_req(struct wrepl_request *req);
400
401 static NTSTATUS wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state *state)
402 {
403         NTSTATUS status;
404
405         status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
406         NT_STATUS_NOT_OK_RETURN(status);
407
408         state->pull_io.in.assoc_ctx     = state->wreplconn->assoc_ctx.peer_ctx;
409         state->pull_io.in.partner       = state->io->in.owner;
410         state->req = wrepl_pull_names_send(state->wreplconn->sock, &state->pull_io);
411         NT_STATUS_HAVE_NO_MEMORY(state->req);
412
413         state->req->async.fn            = wreplsrv_pull_names_handler_req;
414         state->req->async.private       = state;
415
416         state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY;
417
418         return NT_STATUS_OK;
419 }
420
421 static NTSTATUS wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state *state)
422 {
423         NTSTATUS status;
424
425         status = wrepl_pull_names_recv(state->req, state, &state->pull_io);
426         NT_STATUS_NOT_OK_RETURN(status);
427
428         state->stage = WREPLSRV_PULL_NAMES_STAGE_DONE;
429
430         return NT_STATUS_OK;
431 }
432
433 static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state *state)
434 {
435         struct composite_context *c = state->c;
436
437         switch (state->stage) {
438         case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION:
439                 c->status = wreplsrv_pull_names_wait_connection(state);
440                 break;
441         case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY:
442                 c->status = wreplsrv_pull_names_wait_send_reply(state);
443                 c->state  = COMPOSITE_STATE_DONE;
444                 break;
445         case WREPLSRV_PULL_NAMES_STAGE_DONE:
446                 c->status = NT_STATUS_INTERNAL_ERROR;
447         }
448
449         if (!NT_STATUS_IS_OK(c->status)) {
450                 c->state = COMPOSITE_STATE_ERROR;
451         }
452
453         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
454                 c->async.fn(c);
455         }
456 }
457
458 static void wreplsrv_pull_names_handler_creq(struct composite_context *creq)
459 {
460         struct wreplsrv_pull_names_state *state = talloc_get_type(creq->async.private_data,
461                                                   struct wreplsrv_pull_names_state);
462         wreplsrv_pull_names_handler(state);
463         return;
464 }
465
466 static void wreplsrv_pull_names_handler_req(struct wrepl_request *req)
467 {
468         struct wreplsrv_pull_names_state *state = talloc_get_type(req->async.private,
469                                                   struct wreplsrv_pull_names_state);
470         wreplsrv_pull_names_handler(state);
471         return;
472 }
473
474 struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io)
475 {
476         struct composite_context *c = NULL;
477         struct wreplsrv_service *service = io->in.partner->service;
478         struct wreplsrv_pull_names_state *state = NULL;
479         enum winsrepl_partner_type partner_type = WINSREPL_PARTNER_PULL;
480
481         if (io->in.wreplconn) partner_type = WINSREPL_PARTNER_NONE;
482
483         c = talloc_zero(mem_ctx, struct composite_context);
484         if (!c) goto failed;
485
486         state = talloc_zero(c, struct wreplsrv_pull_names_state);
487         if (!state) goto failed;
488         state->c        = c;
489         state->io       = io;
490
491         c->state        = COMPOSITE_STATE_IN_PROGRESS;
492         c->event_ctx    = service->task->event_ctx;
493         c->private_data = state;
494
495         state->stage    = WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION;
496         state->creq     = wreplsrv_out_connect_send(io->in.partner, partner_type, io->in.wreplconn);
497         if (!state->creq) goto failed;
498
499         state->creq->async.fn           = wreplsrv_pull_names_handler_creq;
500         state->creq->async.private_data = state;
501
502         return c;
503 failed:
504         talloc_free(c);
505         return NULL;
506 }
507
508 NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
509                                   struct wreplsrv_pull_names_io *io)
510 {
511         NTSTATUS status;
512
513         status = composite_wait(c);
514
515         if (NT_STATUS_IS_OK(status)) {
516                 struct wreplsrv_pull_names_state *state = talloc_get_type(c->private_data,
517                                                           struct wreplsrv_pull_names_state);
518                 io->out.num_names       = state->pull_io.out.num_names;
519                 io->out.names           = state->pull_io.out.names;
520                 talloc_reference(mem_ctx, state->pull_io.out.names);
521         }
522
523         talloc_free(c);
524         return status;
525         
526 }
527
528 enum wreplsrv_pull_cycle_stage {
529         WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY,
530         WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES,
531         WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC,
532         WREPLSRV_PULL_CYCLE_STAGE_DONE
533 };
534
535 struct wreplsrv_pull_cycle_state {
536         enum wreplsrv_pull_cycle_stage stage;
537         struct composite_context *c;
538         struct wreplsrv_pull_cycle_io *io;
539         struct wreplsrv_pull_table_io table_io;
540         uint32_t current;
541         struct wreplsrv_pull_names_io names_io;
542         struct composite_context *creq;
543         struct wrepl_associate_stop assoc_stop_io;
544         struct wrepl_request *req;
545 };
546
547 static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq);
548 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req);
549
550 static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state *state)
551 {
552         struct wreplsrv_owner *current_owner;
553         struct wreplsrv_owner *local_owner;
554         uint32_t i;
555         uint64_t old_max_version = 0;
556         BOOL do_pull = False;
557
558         for (i=state->current; i < state->table_io.out.num_owners; i++) {
559                 current_owner = wreplsrv_find_owner(state->io->in.partner->pull.table,
560                                                     state->table_io.out.owners[i].address);
561
562                 local_owner = wreplsrv_find_owner(state->io->in.partner->service->table,
563                                                   state->table_io.out.owners[i].address);
564                 /*
565                  * this means we are ourself the current owner,
566                  * and we don't want replicate ourself
567                  */
568                 if (!current_owner) continue;
569
570                 /*
571                  * this means we don't have any records of this owner
572                  * so fetch them
573                  */
574                 if (!local_owner) {
575                         do_pull         = True;
576                         
577                         break;
578                 }
579
580                 /*
581                  * this means the remote partner has some new records of this owner
582                  * fetch them
583                  */
584                 if (current_owner->owner.max_version > local_owner->owner.max_version) {
585                         do_pull         = True;
586                         old_max_version = local_owner->owner.max_version;
587                         break;
588                 }
589         }
590         state->current = i;
591
592         if (do_pull) {
593                 state->names_io.in.partner              = state->io->in.partner;
594                 state->names_io.in.wreplconn            = state->io->in.wreplconn;
595                 state->names_io.in.owner                = current_owner->owner;
596                 state->names_io.in.owner.min_version    = old_max_version + 1;
597                 state->creq = wreplsrv_pull_names_send(state, &state->names_io);
598                 NT_STATUS_HAVE_NO_MEMORY(state->creq);
599
600                 state->creq->async.fn           = wreplsrv_pull_cycle_handler_creq;
601                 state->creq->async.private_data = state;
602
603                 return STATUS_MORE_ENTRIES;
604         }
605
606         return NT_STATUS_OK;
607 }
608
609 static NTSTATUS wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state *state)
610 {
611         NTSTATUS status;
612
613         status = wreplsrv_pull_cycle_next_owner_do_work(state);
614         if (NT_STATUS_IS_OK(status)) {
615                 state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
616         } else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES, status)) {
617                 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES;
618                 status = NT_STATUS_OK;
619         }
620
621         if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE && state->io->in.wreplconn) {
622                 state->assoc_stop_io.in.assoc_ctx       = state->io->in.wreplconn->assoc_ctx.peer_ctx;
623                 state->assoc_stop_io.in.reason          = 0;
624                 state->req = wrepl_associate_stop_send(state->io->in.wreplconn->sock, &state->assoc_stop_io);
625                 NT_STATUS_HAVE_NO_MEMORY(state->req);
626
627                 state->req->async.fn            = wreplsrv_pull_cycle_handler_req;
628                 state->req->async.private       = state;
629
630                 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC;
631         }
632
633         return status;
634 }
635
636 static NTSTATUS wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state *state)
637 {
638         NTSTATUS status;
639         uint32_t i;
640
641         status = wreplsrv_pull_table_recv(state->creq, state, &state->table_io);
642         NT_STATUS_NOT_OK_RETURN(status);
643
644         /* update partner table */
645         for (i=0; i < state->table_io.out.num_owners; i++) {
646                 BOOL is_our_addr;
647
648                 is_our_addr = wreplsrv_is_our_address(state->io->in.partner->service,
649                                                       state->table_io.out.owners[i].address);
650                 if (is_our_addr) continue;
651
652                 status = wreplsrv_add_table(state->io->in.partner->service,
653                                             state->io->in.partner, 
654                                             &state->io->in.partner->pull.table,
655                                             state->table_io.out.owners[i].address,
656                                             state->table_io.out.owners[i].max_version);
657                 NT_STATUS_NOT_OK_RETURN(status);
658         }
659
660         status = wreplsrv_pull_cycle_next_owner_wrapper(state);
661         NT_STATUS_NOT_OK_RETURN(status);
662
663         return status;
664 }
665
666 static NTSTATUS wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state *state)
667 {
668         NTSTATUS status;
669
670         status = wreplsrv_apply_records(state->io->in.partner, &state->names_io);
671         NT_STATUS_NOT_OK_RETURN(status);
672
673         talloc_free(state->names_io.out.names);
674         ZERO_STRUCT(state->names_io);
675
676         return NT_STATUS_OK;
677 }
678
679 static NTSTATUS wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state *state)
680 {
681         NTSTATUS status;
682
683         status = wreplsrv_pull_names_recv(state->creq, state, &state->names_io);
684         NT_STATUS_NOT_OK_RETURN(status);
685
686         /*
687          * TODO: this should maybe an async call,
688          *       because we may need some network access
689          *       for conflict resolving
690          */
691         status = wreplsrv_pull_cycle_apply_records(state);
692         NT_STATUS_NOT_OK_RETURN(status);
693
694         status = wreplsrv_pull_cycle_next_owner_wrapper(state);
695         NT_STATUS_NOT_OK_RETURN(status);
696
697         return status;
698 }
699
700 static NTSTATUS wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state *state)
701 {
702         NTSTATUS status;
703
704         status = wrepl_associate_stop_recv(state->req, &state->assoc_stop_io);
705         NT_STATUS_NOT_OK_RETURN(status);
706
707         state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
708
709         return status;
710 }
711
712 static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state *state)
713 {
714         struct composite_context *c = state->c;
715
716         switch (state->stage) {
717         case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY:
718                 c->status = wreplsrv_pull_cycle_wait_table_reply(state);
719                 break;
720         case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES:
721                 c->status = wreplsrv_pull_cycle_wait_send_replies(state);
722                 break;
723         case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC:
724                 c->status = wreplsrv_pull_cycle_wait_stop_assoc(state);
725                 break;
726         case WREPLSRV_PULL_CYCLE_STAGE_DONE:
727                 c->status = NT_STATUS_INTERNAL_ERROR;
728         }
729
730         if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE) {
731                 c->state  = COMPOSITE_STATE_DONE;
732         }
733
734         if (!NT_STATUS_IS_OK(c->status)) {
735                 c->state = COMPOSITE_STATE_ERROR;
736         }
737
738         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
739                 c->async.fn(c);
740         }
741 }
742
743 static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq)
744 {
745         struct wreplsrv_pull_cycle_state *state = talloc_get_type(creq->async.private_data,
746                                                   struct wreplsrv_pull_cycle_state);
747         wreplsrv_pull_cycle_handler(state);
748         return;
749 }
750
751 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req)
752 {
753         struct wreplsrv_pull_cycle_state *state = talloc_get_type(req->async.private,
754                                                   struct wreplsrv_pull_cycle_state);
755         wreplsrv_pull_cycle_handler(state);
756         return;
757 }
758
759 struct composite_context *wreplsrv_pull_cycle_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_cycle_io *io)
760 {
761         struct composite_context *c = NULL;
762         struct wreplsrv_service *service = io->in.partner->service;
763         struct wreplsrv_pull_cycle_state *state = NULL;
764
765         c = talloc_zero(mem_ctx, struct composite_context);
766         if (!c) goto failed;
767
768         state = talloc_zero(c, struct wreplsrv_pull_cycle_state);
769         if (!state) goto failed;
770         state->c        = c;
771         state->io       = io;
772
773         c->state        = COMPOSITE_STATE_IN_PROGRESS;
774         c->event_ctx    = service->task->event_ctx;
775         c->private_data = state;
776
777         state->stage    = WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY;
778         state->table_io.in.partner      = io->in.partner;
779         state->table_io.in.num_owners   = io->in.num_owners;
780         state->table_io.in.owners       = io->in.owners;
781         state->creq = wreplsrv_pull_table_send(state, &state->table_io);
782         if (!state->creq) goto failed;
783
784         state->creq->async.fn           = wreplsrv_pull_cycle_handler_creq;
785         state->creq->async.private_data = state;
786
787         return c;
788 failed:
789         talloc_free(c);
790         return NULL;
791 }
792
793 NTSTATUS wreplsrv_pull_cycle_recv(struct composite_context *c)
794 {
795         NTSTATUS status;
796
797         status = composite_wait(c);
798
799         talloc_free(c);
800         return status;
801 }
802
803 enum wreplsrv_push_notify_stage {
804         WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT,
805         WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM,
806         WREPLSRV_PUSH_NOTIFY_STAGE_DONE
807 };
808
809 struct wreplsrv_push_notify_state {
810         enum wreplsrv_push_notify_stage stage;
811         struct composite_context *c;
812         struct wreplsrv_push_notify_io *io;
813         enum wrepl_replication_cmd command;
814         BOOL full_table;
815         struct wrepl_send_ctrl ctrl;
816         struct wrepl_request *req;
817         struct wrepl_packet req_packet;
818         struct wrepl_packet *rep_packet;
819         struct composite_context *creq;
820         struct wreplsrv_out_connection *wreplconn;
821 };
822
823 static void wreplsrv_push_notify_handler_creq(struct composite_context *creq);
824 static void wreplsrv_push_notify_handler_req(struct wrepl_request *req);
825
826 static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *state)
827 {
828         struct wreplsrv_service *service = state->io->in.partner->service;
829         struct wrepl_packet *req = &state->req_packet;
830         struct wrepl_replication *repl_out = &state->req_packet.message.replication;
831         struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
832         struct wreplsrv_in_connection *wrepl_in;
833         NTSTATUS status;
834         struct socket_context *sock;
835         struct packet_context *packet;
836         uint16_t fde_flags;
837         const char *our_ip;
838
839         /* prepare the outgoing request */
840         req->opcode     = WREPL_OPCODE_BITS;
841         req->assoc_ctx  = state->wreplconn->assoc_ctx.peer_ctx;
842         req->mess_type  = WREPL_REPLICATION;
843
844         repl_out->command = state->command;
845
846         our_ip = socket_get_my_addr(state->wreplconn->sock->sock, state);
847         NT_STATUS_HAVE_NO_MEMORY(our_ip);
848
849         status = wreplsrv_fill_wrepl_table(service, state, table_out,
850                                            our_ip, our_ip, state->full_table);
851         NT_STATUS_NOT_OK_RETURN(status);
852
853         /* queue the request */
854         state->req = wrepl_request_send(state->wreplconn->sock, req, NULL);
855         NT_STATUS_HAVE_NO_MEMORY(state->req);
856
857         /*
858          * now we need to convert the wrepl_socket (client connection)
859          * into a wreplsrv_in_connection (server connection), because
860          * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
861          * message is received by the peer.
862          */
863
864         /* steal the socket_context */
865         sock = state->wreplconn->sock->sock;
866         state->wreplconn->sock->sock = NULL;
867         talloc_steal(state, sock);
868
869         /* 
870          * steal the packet_context
871          * note the request DATA_BLOB we just send on the
872          * wrepl_socket (client connection) is still unter the 
873          * packet context and will be send to the wire
874          */
875         packet = state->wreplconn->sock->packet;
876         state->wreplconn->sock->packet = NULL;
877         talloc_steal(state, packet);
878
879         /*
880          * get the fde_flags of the old fde event,
881          * so that we can later set the same flags to the new one
882          */
883         fde_flags = event_get_fd_flags(state->wreplconn->sock->event.fde);
884
885         /*
886          * free the wrepl_socket (client connection)
887          */
888         talloc_free(state->wreplconn->sock);
889         state->wreplconn->sock = NULL;
890
891         /*
892          * now create a wreplsrv_in_connection,
893          * on which we act as server
894          *
895          * NOTE: sock and packet will be stolen by
896          *       wreplsrv_in_connection_merge()
897          */
898         status = wreplsrv_in_connection_merge(state->io->in.partner,
899                                               sock, packet, &wrepl_in);
900         NT_STATUS_NOT_OK_RETURN(status);
901
902         event_set_fd_flags(wrepl_in->conn->event.fde, fde_flags);
903
904         wrepl_in->assoc_ctx.peer_ctx    = state->wreplconn->assoc_ctx.peer_ctx;
905         wrepl_in->assoc_ctx.our_ctx     = 0;
906
907         /* now we can free the wreplsrv_out_connection */
908         talloc_free(state->wreplconn);
909         state->wreplconn = NULL;
910
911         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
912
913         return NT_STATUS_OK;
914 }
915
916 static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *state)
917 {
918         struct wreplsrv_service *service = state->io->in.partner->service;
919         struct wrepl_packet *req = &state->req_packet;
920         struct wrepl_replication *repl_out = &state->req_packet.message.replication;
921         struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
922         NTSTATUS status;
923         const char *our_ip;
924
925         req->opcode     = WREPL_OPCODE_BITS;
926         req->assoc_ctx  = state->wreplconn->assoc_ctx.peer_ctx;
927         req->mess_type  = WREPL_REPLICATION;
928
929         repl_out->command = state->command;
930
931         our_ip = socket_get_my_addr(state->wreplconn->sock->sock, state);
932         NT_STATUS_HAVE_NO_MEMORY(our_ip);
933
934         status = wreplsrv_fill_wrepl_table(service, state, table_out,
935                                            our_ip, our_ip, state->full_table);
936         NT_STATUS_NOT_OK_RETURN(status);
937
938         /* we won't get a reply to a inform message */
939         state->ctrl.send_only           = True;
940
941         state->req = wrepl_request_send(state->wreplconn->sock, req, &state->ctrl);
942         NT_STATUS_HAVE_NO_MEMORY(state->req);
943
944         state->req->async.fn            = wreplsrv_push_notify_handler_req;
945         state->req->async.private       = state;
946
947         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM;
948
949         return NT_STATUS_OK;
950 }
951
952 static NTSTATUS wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state *state)
953 {
954         NTSTATUS status;
955
956         status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
957         NT_STATUS_NOT_OK_RETURN(status);
958
959         switch (state->command) {
960         case WREPL_REPL_UPDATE:
961                 state->full_table = True;
962                 return wreplsrv_push_notify_update(state);
963         case WREPL_REPL_UPDATE2:
964                 state->full_table = False;
965                 return wreplsrv_push_notify_update(state);
966         case WREPL_REPL_INFORM:
967                 state->full_table = True;
968                 return wreplsrv_push_notify_inform(state);
969         case WREPL_REPL_INFORM2:
970                 state->full_table = False;
971                 return wreplsrv_push_notify_inform(state);
972         default:
973                 return NT_STATUS_INTERNAL_ERROR;
974         }
975
976         return NT_STATUS_INTERNAL_ERROR;
977 }
978
979 static NTSTATUS wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state *state)
980 {
981         NTSTATUS status;
982
983         status =  wrepl_request_recv(state->req, state, NULL);
984         NT_STATUS_NOT_OK_RETURN(status);
985
986         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
987         return status;
988 }
989
990 static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state *state)
991 {
992         struct composite_context *c = state->c;
993
994         switch (state->stage) {
995         case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT:
996                 c->status = wreplsrv_push_notify_wait_connect(state);
997                 break;
998         case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM:
999                 c->status = wreplsrv_push_notify_wait_inform(state);
1000                 break;
1001         case WREPLSRV_PUSH_NOTIFY_STAGE_DONE:
1002                 c->status = NT_STATUS_INTERNAL_ERROR;
1003         }
1004
1005         if (state->stage == WREPLSRV_PUSH_NOTIFY_STAGE_DONE) {
1006                 c->state  = COMPOSITE_STATE_DONE;
1007         }
1008
1009         if (!NT_STATUS_IS_OK(c->status)) {
1010                 c->state = COMPOSITE_STATE_ERROR;
1011         }
1012
1013         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
1014                 c->async.fn(c);
1015         }
1016 }
1017
1018 static void wreplsrv_push_notify_handler_creq(struct composite_context *creq)
1019 {
1020         struct wreplsrv_push_notify_state *state = talloc_get_type(creq->async.private_data,
1021                                                    struct wreplsrv_push_notify_state);
1022         wreplsrv_push_notify_handler(state);
1023         return;
1024 }
1025
1026 static void wreplsrv_push_notify_handler_req(struct wrepl_request *req)
1027 {
1028         struct wreplsrv_push_notify_state *state = talloc_get_type(req->async.private,
1029                                                    struct wreplsrv_push_notify_state);
1030         wreplsrv_push_notify_handler(state);
1031         return;
1032 }
1033
1034 struct composite_context *wreplsrv_push_notify_send(TALLOC_CTX *mem_ctx, struct wreplsrv_push_notify_io *io)
1035 {
1036         struct composite_context *c = NULL;
1037         struct wreplsrv_service *service = io->in.partner->service;
1038         struct wreplsrv_push_notify_state *state = NULL;
1039         enum winsrepl_partner_type partner_type;
1040
1041         c = talloc_zero(mem_ctx, struct composite_context);
1042         if (!c) goto failed;
1043
1044         state = talloc_zero(c, struct wreplsrv_push_notify_state);
1045         if (!state) goto failed;
1046         state->c        = c;
1047         state->io       = io;
1048
1049         if (io->in.inform) {
1050                 /* we can cache the connection in partner->push->wreplconn */
1051                 partner_type = WINSREPL_PARTNER_PUSH;
1052                 if (io->in.propagate) {
1053                         state->command  = WREPL_REPL_INFORM2;
1054                 } else {
1055                         state->command  = WREPL_REPL_INFORM;
1056                 }
1057         } else {
1058                 /* we can NOT cache the connection */
1059                 partner_type = WINSREPL_PARTNER_NONE;
1060                 if (io->in.propagate) {
1061                         state->command  = WREPL_REPL_UPDATE2;
1062                 } else {
1063                         state->command  = WREPL_REPL_UPDATE;
1064                 }       
1065         }
1066
1067         c->state        = COMPOSITE_STATE_IN_PROGRESS;
1068         c->event_ctx    = service->task->event_ctx;
1069         c->private_data = state;
1070
1071         state->stage    = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT;
1072         state->creq     = wreplsrv_out_connect_send(io->in.partner, partner_type, NULL);
1073         if (!state->creq) goto failed;
1074
1075         state->creq->async.fn           = wreplsrv_push_notify_handler_creq;
1076         state->creq->async.private_data = state;
1077
1078         return c;
1079 failed:
1080         talloc_free(c);
1081         return NULL;
1082 }
1083
1084 NTSTATUS wreplsrv_push_notify_recv(struct composite_context *c)
1085 {
1086         NTSTATUS status;
1087
1088         status = composite_wait(c);
1089
1090         talloc_free(c);
1091         return status;
1092 }