Merge branch 'master' of ssh://git.samba.org/data/git/samba into pyregistry
[ira/wip.git] / source4 / wrepl_server / wrepl_out_helpers.c
1 /* 
2    Unix SMB/CIFS implementation.
3    
4    WINS Replication server
5    
6    Copyright (C) Stefan Metzmacher      2005
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 3 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 */
21
22 #include "includes.h"
23 #include "lib/events/events.h"
24 #include "lib/socket/socket.h"
25 #include "smbd/service_task.h"
26 #include "smbd/service_stream.h"
27 #include "librpc/gen_ndr/winsrepl.h"
28 #include "wrepl_server/wrepl_server.h"
29 #include "nbt_server/wins/winsdb.h"
30 #include "libcli/composite/composite.h"
31 #include "libcli/wrepl/winsrepl.h"
32 #include "libcli/resolve/resolve.h"
33 #include "param/param.h"
34
35 enum wreplsrv_out_connect_stage {
36         WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET,
37         WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX,
38         WREPLSRV_OUT_CONNECT_STAGE_DONE
39 };
40
41 struct wreplsrv_out_connect_state {
42         enum wreplsrv_out_connect_stage stage;
43         struct composite_context *c;
44         struct wrepl_request *req;
45         struct composite_context *c_req;
46         struct wrepl_associate assoc_io;
47         enum winsrepl_partner_type type;
48         struct wreplsrv_out_connection *wreplconn;
49 };
50
51 static void wreplsrv_out_connect_handler_creq(struct composite_context *c_req);
52 static void wreplsrv_out_connect_handler_req(struct wrepl_request *req);
53
54 static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state)
55 {
56         NTSTATUS status;
57
58         status = wrepl_connect_recv(state->c_req);
59         NT_STATUS_NOT_OK_RETURN(status);
60
61         state->req = wrepl_associate_send(state->wreplconn->sock, &state->assoc_io);
62         NT_STATUS_HAVE_NO_MEMORY(state->req);
63
64         state->req->async.fn            = wreplsrv_out_connect_handler_req;
65         state->req->async.private       = state;
66
67         state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX;
68
69         return NT_STATUS_OK;
70 }
71
72 static NTSTATUS wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state *state)
73 {
74         NTSTATUS status;
75
76         status = wrepl_associate_recv(state->req, &state->assoc_io);
77         NT_STATUS_NOT_OK_RETURN(status);
78
79         state->wreplconn->assoc_ctx.peer_ctx = state->assoc_io.out.assoc_ctx;
80
81         if (state->type == WINSREPL_PARTNER_PUSH) {
82                 state->wreplconn->partner->push.wreplconn = state->wreplconn;
83                 talloc_steal(state->wreplconn->partner, state->wreplconn);
84         } else if (state->type == WINSREPL_PARTNER_PULL) {
85                 state->wreplconn->partner->pull.wreplconn = state->wreplconn;
86                 talloc_steal(state->wreplconn->partner, state->wreplconn);
87         }
88
89         state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
90
91         return NT_STATUS_OK;
92 }
93
94 static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state *state)
95 {
96         struct composite_context *c = state->c;
97
98         switch (state->stage) {
99         case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET:
100                 c->status = wreplsrv_out_connect_wait_socket(state);
101                 break;
102         case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX:
103                 c->status = wreplsrv_out_connect_wait_assoc_ctx(state);
104                 c->state  = COMPOSITE_STATE_DONE;
105                 break;
106         case WREPLSRV_OUT_CONNECT_STAGE_DONE:
107                 c->status = NT_STATUS_INTERNAL_ERROR;
108         }
109
110         if (!NT_STATUS_IS_OK(c->status)) {
111                 c->state = COMPOSITE_STATE_ERROR;
112         }
113
114         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
115                 c->async.fn(c);
116         }
117 }
118
119 static void wreplsrv_out_connect_handler_creq(struct composite_context *creq)
120 {
121         struct wreplsrv_out_connect_state *state = talloc_get_type(creq->async.private_data,
122                                                    struct wreplsrv_out_connect_state);
123         wreplsrv_out_connect_handler(state);
124         return;
125 }
126
127 static void wreplsrv_out_connect_handler_req(struct wrepl_request *req)
128 {
129         struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private,
130                                                    struct wreplsrv_out_connect_state);
131         wreplsrv_out_connect_handler(state);
132         return;
133 }
134
135 static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner,
136                                                            enum winsrepl_partner_type type,
137                                                            struct wreplsrv_out_connection *wreplconn)
138 {
139         struct composite_context *c = NULL;
140         struct wreplsrv_service *service = partner->service;
141         struct wreplsrv_out_connect_state *state = NULL;
142         struct wreplsrv_out_connection **wreplconnp = &wreplconn;
143         bool cached_connection = false;
144
145         c = talloc_zero(partner, struct composite_context);
146         if (!c) goto failed;
147
148         state = talloc_zero(c, struct wreplsrv_out_connect_state);
149         if (!state) goto failed;
150         state->c        = c;
151         state->type     = type;
152
153         c->state        = COMPOSITE_STATE_IN_PROGRESS;
154         c->event_ctx    = service->task->event_ctx;
155         c->private_data = state;
156
157         if (type == WINSREPL_PARTNER_PUSH) {
158                 cached_connection       = true;
159                 wreplconn               = partner->push.wreplconn;
160                 wreplconnp              = &partner->push.wreplconn;
161         } else if (type == WINSREPL_PARTNER_PULL) {
162                 cached_connection       = true;
163                 wreplconn               = partner->pull.wreplconn;
164                 wreplconnp              = &partner->pull.wreplconn;
165         }
166
167         /* we have a connection already, so use it */
168         if (wreplconn) {
169                 if (!wreplconn->sock->dead) {
170                         state->stage    = WREPLSRV_OUT_CONNECT_STAGE_DONE;
171                         state->wreplconn= wreplconn;
172                         composite_done(c);
173                         return c;
174                 } else if (!cached_connection) {
175                         state->stage    = WREPLSRV_OUT_CONNECT_STAGE_DONE;
176                         state->wreplconn= NULL;
177                         composite_done(c);
178                         return c;
179                 } else {
180                         talloc_free(wreplconn);
181                         *wreplconnp = NULL;
182                 }
183         }
184
185         wreplconn = talloc_zero(state, struct wreplsrv_out_connection);
186         if (!wreplconn) goto failed;
187
188         wreplconn->service      = service;
189         wreplconn->partner      = partner;
190         wreplconn->sock         = wrepl_socket_init(wreplconn, service->task->event_ctx, lp_iconv_convenience(service->task->lp_ctx));
191         if (!wreplconn->sock) goto failed;
192
193         state->stage    = WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET;
194         state->wreplconn= wreplconn;
195         state->c_req    = wrepl_connect_send(wreplconn->sock,
196                                              partner->our_address?partner->our_address:wrepl_best_ip(service->task->lp_ctx, partner->address),
197                                              partner->address);
198         if (!state->c_req) goto failed;
199
200         state->c_req->async.fn                  = wreplsrv_out_connect_handler_creq;
201         state->c_req->async.private_data        = state;
202
203         return c;
204 failed:
205         talloc_free(c);
206         return NULL;
207 }
208
209 static NTSTATUS wreplsrv_out_connect_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
210                                           struct wreplsrv_out_connection **wreplconn)
211 {
212         NTSTATUS status;
213
214         status = composite_wait(c);
215
216         if (NT_STATUS_IS_OK(status)) {
217                 struct wreplsrv_out_connect_state *state = talloc_get_type(c->private_data,
218                                                            struct wreplsrv_out_connect_state);
219                 if (state->wreplconn) {
220                         *wreplconn = talloc_reference(mem_ctx, state->wreplconn);
221                         if (!*wreplconn) status = NT_STATUS_NO_MEMORY;
222                 } else {
223                         status = NT_STATUS_INVALID_CONNECTION;
224                 }
225         }
226
227         talloc_free(c);
228         return status;
229         
230 }
231
232 struct wreplsrv_pull_table_io {
233         struct {
234                 struct wreplsrv_partner *partner;
235                 uint32_t num_owners;
236                 struct wrepl_wins_owner *owners;
237         } in;
238         struct {
239                 uint32_t num_owners;
240                 struct wrepl_wins_owner *owners;
241         } out;
242 };
243
244 enum wreplsrv_pull_table_stage {
245         WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION,
246         WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY,
247         WREPLSRV_PULL_TABLE_STAGE_DONE
248 };
249
250 struct wreplsrv_pull_table_state {
251         enum wreplsrv_pull_table_stage stage;
252         struct composite_context *c;
253         struct wrepl_request *req;
254         struct wrepl_pull_table table_io;
255         struct wreplsrv_pull_table_io *io;
256         struct composite_context *creq;
257         struct wreplsrv_out_connection *wreplconn;
258 };
259
260 static void wreplsrv_pull_table_handler_req(struct wrepl_request *req);
261
262 static NTSTATUS wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state *state)
263 {
264         NTSTATUS status;
265
266         status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
267         NT_STATUS_NOT_OK_RETURN(status);
268
269         state->table_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
270         state->req = wrepl_pull_table_send(state->wreplconn->sock, &state->table_io);
271         NT_STATUS_HAVE_NO_MEMORY(state->req);
272
273         state->req->async.fn            = wreplsrv_pull_table_handler_req;
274         state->req->async.private       = state;
275
276         state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY;
277
278         return NT_STATUS_OK;
279 }
280
281 static NTSTATUS wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state *state)
282 {
283         NTSTATUS status;
284
285         status = wrepl_pull_table_recv(state->req, state, &state->table_io);
286         NT_STATUS_NOT_OK_RETURN(status);
287
288         state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
289
290         return NT_STATUS_OK;
291 }
292
293 static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state *state)
294 {
295         struct composite_context *c = state->c;
296
297         switch (state->stage) {
298         case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION:
299                 c->status = wreplsrv_pull_table_wait_connection(state);
300                 break;
301         case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY:
302                 c->status = wreplsrv_pull_table_wait_table_reply(state);
303                 c->state  = COMPOSITE_STATE_DONE;
304                 break;
305         case WREPLSRV_PULL_TABLE_STAGE_DONE:
306                 c->status = NT_STATUS_INTERNAL_ERROR;
307         }
308
309         if (!NT_STATUS_IS_OK(c->status)) {
310                 c->state = COMPOSITE_STATE_ERROR;
311         }
312
313         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
314                 c->async.fn(c);
315         }
316 }
317
318 static void wreplsrv_pull_table_handler_creq(struct composite_context *creq)
319 {
320         struct wreplsrv_pull_table_state *state = talloc_get_type(creq->async.private_data,
321                                                   struct wreplsrv_pull_table_state);
322         wreplsrv_pull_table_handler(state);
323         return;
324 }
325
326 static void wreplsrv_pull_table_handler_req(struct wrepl_request *req)
327 {
328         struct wreplsrv_pull_table_state *state = talloc_get_type(req->async.private,
329                                                   struct wreplsrv_pull_table_state);
330         wreplsrv_pull_table_handler(state);
331         return;
332 }
333
334 static struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io)
335 {
336         struct composite_context *c = NULL;
337         struct wreplsrv_service *service = io->in.partner->service;
338         struct wreplsrv_pull_table_state *state = NULL;
339
340         c = talloc_zero(mem_ctx, struct composite_context);
341         if (!c) goto failed;
342
343         state = talloc_zero(c, struct wreplsrv_pull_table_state);
344         if (!state) goto failed;
345         state->c        = c;
346         state->io       = io;
347
348         c->state        = COMPOSITE_STATE_IN_PROGRESS;
349         c->event_ctx    = service->task->event_ctx;
350         c->private_data = state;
351
352         if (io->in.num_owners) {
353                 state->table_io.out.num_partners        = io->in.num_owners;
354                 state->table_io.out.partners            = io->in.owners;
355                 state->stage                            = WREPLSRV_PULL_TABLE_STAGE_DONE;
356                 composite_done(c);
357                 return c;
358         }
359
360         state->stage    = WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION;
361         state->creq     = wreplsrv_out_connect_send(io->in.partner, WINSREPL_PARTNER_PULL, NULL);
362         if (!state->creq) goto failed;
363
364         state->creq->async.fn           = wreplsrv_pull_table_handler_creq;
365         state->creq->async.private_data = state;
366
367         return c;
368 failed:
369         talloc_free(c);
370         return NULL;
371 }
372
373 static NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
374                                          struct wreplsrv_pull_table_io *io)
375 {
376         NTSTATUS status;
377
378         status = composite_wait(c);
379
380         if (NT_STATUS_IS_OK(status)) {
381                 struct wreplsrv_pull_table_state *state = talloc_get_type(c->private_data,
382                                                           struct wreplsrv_pull_table_state);
383                 io->out.num_owners      = state->table_io.out.num_partners;
384                 io->out.owners          = talloc_reference(mem_ctx, state->table_io.out.partners);
385         }
386
387         talloc_free(c);
388         return status;  
389 }
390
391 struct wreplsrv_pull_names_io {
392         struct {
393                 struct wreplsrv_partner *partner;
394                 struct wreplsrv_out_connection *wreplconn;
395                 struct wrepl_wins_owner owner;
396         } in;
397         struct {
398                 uint32_t num_names;
399                 struct wrepl_name *names;
400         } out;
401 };
402
403 enum wreplsrv_pull_names_stage {
404         WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION,
405         WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY,
406         WREPLSRV_PULL_NAMES_STAGE_DONE
407 };
408
409 struct wreplsrv_pull_names_state {
410         enum wreplsrv_pull_names_stage stage;
411         struct composite_context *c;
412         struct wrepl_request *req;
413         struct wrepl_pull_names pull_io;
414         struct wreplsrv_pull_names_io *io;
415         struct composite_context *creq;
416         struct wreplsrv_out_connection *wreplconn;
417 };
418
419 static void wreplsrv_pull_names_handler_req(struct wrepl_request *req);
420
421 static NTSTATUS wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state *state)
422 {
423         NTSTATUS status;
424
425         status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
426         NT_STATUS_NOT_OK_RETURN(status);
427
428         state->pull_io.in.assoc_ctx     = state->wreplconn->assoc_ctx.peer_ctx;
429         state->pull_io.in.partner       = state->io->in.owner;
430         state->req = wrepl_pull_names_send(state->wreplconn->sock, &state->pull_io);
431         NT_STATUS_HAVE_NO_MEMORY(state->req);
432
433         state->req->async.fn            = wreplsrv_pull_names_handler_req;
434         state->req->async.private       = state;
435
436         state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY;
437
438         return NT_STATUS_OK;
439 }
440
441 static NTSTATUS wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state *state)
442 {
443         NTSTATUS status;
444
445         status = wrepl_pull_names_recv(state->req, state, &state->pull_io);
446         NT_STATUS_NOT_OK_RETURN(status);
447
448         state->stage = WREPLSRV_PULL_NAMES_STAGE_DONE;
449
450         return NT_STATUS_OK;
451 }
452
453 static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state *state)
454 {
455         struct composite_context *c = state->c;
456
457         switch (state->stage) {
458         case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION:
459                 c->status = wreplsrv_pull_names_wait_connection(state);
460                 break;
461         case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY:
462                 c->status = wreplsrv_pull_names_wait_send_reply(state);
463                 c->state  = COMPOSITE_STATE_DONE;
464                 break;
465         case WREPLSRV_PULL_NAMES_STAGE_DONE:
466                 c->status = NT_STATUS_INTERNAL_ERROR;
467         }
468
469         if (!NT_STATUS_IS_OK(c->status)) {
470                 c->state = COMPOSITE_STATE_ERROR;
471         }
472
473         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
474                 c->async.fn(c);
475         }
476 }
477
478 static void wreplsrv_pull_names_handler_creq(struct composite_context *creq)
479 {
480         struct wreplsrv_pull_names_state *state = talloc_get_type(creq->async.private_data,
481                                                   struct wreplsrv_pull_names_state);
482         wreplsrv_pull_names_handler(state);
483         return;
484 }
485
486 static void wreplsrv_pull_names_handler_req(struct wrepl_request *req)
487 {
488         struct wreplsrv_pull_names_state *state = talloc_get_type(req->async.private,
489                                                   struct wreplsrv_pull_names_state);
490         wreplsrv_pull_names_handler(state);
491         return;
492 }
493
494 static struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io)
495 {
496         struct composite_context *c = NULL;
497         struct wreplsrv_service *service = io->in.partner->service;
498         struct wreplsrv_pull_names_state *state = NULL;
499         enum winsrepl_partner_type partner_type = WINSREPL_PARTNER_PULL;
500
501         if (io->in.wreplconn) partner_type = WINSREPL_PARTNER_NONE;
502
503         c = talloc_zero(mem_ctx, struct composite_context);
504         if (!c) goto failed;
505
506         state = talloc_zero(c, struct wreplsrv_pull_names_state);
507         if (!state) goto failed;
508         state->c        = c;
509         state->io       = io;
510
511         c->state        = COMPOSITE_STATE_IN_PROGRESS;
512         c->event_ctx    = service->task->event_ctx;
513         c->private_data = state;
514
515         state->stage    = WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION;
516         state->creq     = wreplsrv_out_connect_send(io->in.partner, partner_type, io->in.wreplconn);
517         if (!state->creq) goto failed;
518
519         state->creq->async.fn           = wreplsrv_pull_names_handler_creq;
520         state->creq->async.private_data = state;
521
522         return c;
523 failed:
524         talloc_free(c);
525         return NULL;
526 }
527
528 static NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
529                                          struct wreplsrv_pull_names_io *io)
530 {
531         NTSTATUS status;
532
533         status = composite_wait(c);
534
535         if (NT_STATUS_IS_OK(status)) {
536                 struct wreplsrv_pull_names_state *state = talloc_get_type(c->private_data,
537                                                           struct wreplsrv_pull_names_state);
538                 io->out.num_names       = state->pull_io.out.num_names;
539                 io->out.names           = talloc_reference(mem_ctx, state->pull_io.out.names);
540         }
541
542         talloc_free(c);
543         return status;
544         
545 }
546
547 enum wreplsrv_pull_cycle_stage {
548         WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY,
549         WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES,
550         WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC,
551         WREPLSRV_PULL_CYCLE_STAGE_DONE
552 };
553
554 struct wreplsrv_pull_cycle_state {
555         enum wreplsrv_pull_cycle_stage stage;
556         struct composite_context *c;
557         struct wreplsrv_pull_cycle_io *io;
558         struct wreplsrv_pull_table_io table_io;
559         uint32_t current;
560         struct wreplsrv_pull_names_io names_io;
561         struct composite_context *creq;
562         struct wrepl_associate_stop assoc_stop_io;
563         struct wrepl_request *req;
564 };
565
566 static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq);
567 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req);
568
569 static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state *state)
570 {
571         struct wreplsrv_owner *current_owner=NULL;
572         struct wreplsrv_owner *local_owner;
573         uint32_t i;
574         uint64_t old_max_version = 0;
575         bool do_pull = false;
576
577         for (i=state->current; i < state->table_io.out.num_owners; i++) {
578                 current_owner = wreplsrv_find_owner(state->io->in.partner->service,
579                                                     state->io->in.partner->pull.table,
580                                                     state->table_io.out.owners[i].address);
581
582                 local_owner = wreplsrv_find_owner(state->io->in.partner->service,
583                                                   state->io->in.partner->service->table,
584                                                   state->table_io.out.owners[i].address);
585                 /*
586                  * this means we are ourself the current owner,
587                  * and we don't want replicate ourself
588                  */
589                 if (!current_owner) continue;
590
591                 /*
592                  * this means we don't have any records of this owner
593                  * so fetch them
594                  */
595                 if (!local_owner) {
596                         do_pull         = true;
597                         
598                         break;
599                 }
600
601                 /*
602                  * this means the remote partner has some new records of this owner
603                  * fetch them
604                  */
605                 if (current_owner->owner.max_version > local_owner->owner.max_version) {
606                         do_pull         = true;
607                         old_max_version = local_owner->owner.max_version;
608                         break;
609                 }
610         }
611         state->current = i;
612
613         if (do_pull) {
614                 state->names_io.in.partner              = state->io->in.partner;
615                 state->names_io.in.wreplconn            = state->io->in.wreplconn;
616                 state->names_io.in.owner                = current_owner->owner;
617                 state->names_io.in.owner.min_version    = old_max_version + 1;
618                 state->creq = wreplsrv_pull_names_send(state, &state->names_io);
619                 NT_STATUS_HAVE_NO_MEMORY(state->creq);
620
621                 state->creq->async.fn           = wreplsrv_pull_cycle_handler_creq;
622                 state->creq->async.private_data = state;
623
624                 return STATUS_MORE_ENTRIES;
625         }
626
627         return NT_STATUS_OK;
628 }
629
630 static NTSTATUS wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state *state)
631 {
632         NTSTATUS status;
633
634         status = wreplsrv_pull_cycle_next_owner_do_work(state);
635         if (NT_STATUS_IS_OK(status)) {
636                 state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
637         } else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES, status)) {
638                 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES;
639                 status = NT_STATUS_OK;
640         }
641
642         if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE && state->io->in.wreplconn) {
643                 state->assoc_stop_io.in.assoc_ctx       = state->io->in.wreplconn->assoc_ctx.peer_ctx;
644                 state->assoc_stop_io.in.reason          = 0;
645                 state->req = wrepl_associate_stop_send(state->io->in.wreplconn->sock, &state->assoc_stop_io);
646                 NT_STATUS_HAVE_NO_MEMORY(state->req);
647
648                 state->req->async.fn            = wreplsrv_pull_cycle_handler_req;
649                 state->req->async.private       = state;
650
651                 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC;
652         }
653
654         return status;
655 }
656
657 static NTSTATUS wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state *state)
658 {
659         NTSTATUS status;
660         uint32_t i;
661
662         status = wreplsrv_pull_table_recv(state->creq, state, &state->table_io);
663         NT_STATUS_NOT_OK_RETURN(status);
664
665         /* update partner table */
666         for (i=0; i < state->table_io.out.num_owners; i++) {
667                 status = wreplsrv_add_table(state->io->in.partner->service,
668                                             state->io->in.partner, 
669                                             &state->io->in.partner->pull.table,
670                                             state->table_io.out.owners[i].address,
671                                             state->table_io.out.owners[i].max_version);
672                 NT_STATUS_NOT_OK_RETURN(status);
673         }
674
675         status = wreplsrv_pull_cycle_next_owner_wrapper(state);
676         NT_STATUS_NOT_OK_RETURN(status);
677
678         return status;
679 }
680
681 static NTSTATUS wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state *state)
682 {
683         NTSTATUS status;
684
685         status = wreplsrv_apply_records(state->io->in.partner,
686                                         &state->names_io.in.owner,
687                                         state->names_io.out.num_names,
688                                         state->names_io.out.names);
689         NT_STATUS_NOT_OK_RETURN(status);
690
691         talloc_free(state->names_io.out.names);
692         ZERO_STRUCT(state->names_io);
693
694         return NT_STATUS_OK;
695 }
696
697 static NTSTATUS wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state *state)
698 {
699         NTSTATUS status;
700
701         status = wreplsrv_pull_names_recv(state->creq, state, &state->names_io);
702         NT_STATUS_NOT_OK_RETURN(status);
703
704         /*
705          * TODO: this should maybe an async call,
706          *       because we may need some network access
707          *       for conflict resolving
708          */
709         status = wreplsrv_pull_cycle_apply_records(state);
710         NT_STATUS_NOT_OK_RETURN(status);
711
712         status = wreplsrv_pull_cycle_next_owner_wrapper(state);
713         NT_STATUS_NOT_OK_RETURN(status);
714
715         return status;
716 }
717
718 static NTSTATUS wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state *state)
719 {
720         NTSTATUS status;
721
722         status = wrepl_associate_stop_recv(state->req, &state->assoc_stop_io);
723         NT_STATUS_NOT_OK_RETURN(status);
724
725         state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
726
727         return status;
728 }
729
730 static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state *state)
731 {
732         struct composite_context *c = state->c;
733
734         switch (state->stage) {
735         case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY:
736                 c->status = wreplsrv_pull_cycle_wait_table_reply(state);
737                 break;
738         case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES:
739                 c->status = wreplsrv_pull_cycle_wait_send_replies(state);
740                 break;
741         case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC:
742                 c->status = wreplsrv_pull_cycle_wait_stop_assoc(state);
743                 break;
744         case WREPLSRV_PULL_CYCLE_STAGE_DONE:
745                 c->status = NT_STATUS_INTERNAL_ERROR;
746         }
747
748         if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE) {
749                 c->state  = COMPOSITE_STATE_DONE;
750         }
751
752         if (!NT_STATUS_IS_OK(c->status)) {
753                 c->state = COMPOSITE_STATE_ERROR;
754         }
755
756         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
757                 c->async.fn(c);
758         }
759 }
760
761 static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq)
762 {
763         struct wreplsrv_pull_cycle_state *state = talloc_get_type(creq->async.private_data,
764                                                   struct wreplsrv_pull_cycle_state);
765         wreplsrv_pull_cycle_handler(state);
766         return;
767 }
768
769 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req)
770 {
771         struct wreplsrv_pull_cycle_state *state = talloc_get_type(req->async.private,
772                                                   struct wreplsrv_pull_cycle_state);
773         wreplsrv_pull_cycle_handler(state);
774         return;
775 }
776
777 struct composite_context *wreplsrv_pull_cycle_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_cycle_io *io)
778 {
779         struct composite_context *c = NULL;
780         struct wreplsrv_service *service = io->in.partner->service;
781         struct wreplsrv_pull_cycle_state *state = NULL;
782
783         c = talloc_zero(mem_ctx, struct composite_context);
784         if (!c) goto failed;
785
786         state = talloc_zero(c, struct wreplsrv_pull_cycle_state);
787         if (!state) goto failed;
788         state->c        = c;
789         state->io       = io;
790
791         c->state        = COMPOSITE_STATE_IN_PROGRESS;
792         c->event_ctx    = service->task->event_ctx;
793         c->private_data = state;
794
795         state->stage    = WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY;
796         state->table_io.in.partner      = io->in.partner;
797         state->table_io.in.num_owners   = io->in.num_owners;
798         state->table_io.in.owners       = io->in.owners;
799         state->creq = wreplsrv_pull_table_send(state, &state->table_io);
800         if (!state->creq) goto failed;
801
802         state->creq->async.fn           = wreplsrv_pull_cycle_handler_creq;
803         state->creq->async.private_data = state;
804
805         return c;
806 failed:
807         talloc_free(c);
808         return NULL;
809 }
810
811 NTSTATUS wreplsrv_pull_cycle_recv(struct composite_context *c)
812 {
813         NTSTATUS status;
814
815         status = composite_wait(c);
816
817         talloc_free(c);
818         return status;
819 }
820
821 enum wreplsrv_push_notify_stage {
822         WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT,
823         WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM,
824         WREPLSRV_PUSH_NOTIFY_STAGE_DONE
825 };
826
827 struct wreplsrv_push_notify_state {
828         enum wreplsrv_push_notify_stage stage;
829         struct composite_context *c;
830         struct wreplsrv_push_notify_io *io;
831         enum wrepl_replication_cmd command;
832         bool full_table;
833         struct wrepl_send_ctrl ctrl;
834         struct wrepl_request *req;
835         struct wrepl_packet req_packet;
836         struct wrepl_packet *rep_packet;
837         struct composite_context *creq;
838         struct wreplsrv_out_connection *wreplconn;
839 };
840
841 static void wreplsrv_push_notify_handler_creq(struct composite_context *creq);
842 static void wreplsrv_push_notify_handler_req(struct wrepl_request *req);
843
844 static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *state)
845 {
846         struct wreplsrv_service *service = state->io->in.partner->service;
847         struct wrepl_packet *req = &state->req_packet;
848         struct wrepl_replication *repl_out = &state->req_packet.message.replication;
849         struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
850         struct wreplsrv_in_connection *wrepl_in;
851         NTSTATUS status;
852         struct socket_context *sock;
853         struct packet_context *packet;
854         uint16_t fde_flags;
855
856         /* prepare the outgoing request */
857         req->opcode     = WREPL_OPCODE_BITS;
858         req->assoc_ctx  = state->wreplconn->assoc_ctx.peer_ctx;
859         req->mess_type  = WREPL_REPLICATION;
860
861         repl_out->command = state->command;
862
863         status = wreplsrv_fill_wrepl_table(service, state, table_out,
864                                            service->wins_db->local_owner, state->full_table);
865         NT_STATUS_NOT_OK_RETURN(status);
866
867         /* queue the request */
868         state->req = wrepl_request_send(state->wreplconn->sock, req, NULL);
869         NT_STATUS_HAVE_NO_MEMORY(state->req);
870
871         /*
872          * now we need to convert the wrepl_socket (client connection)
873          * into a wreplsrv_in_connection (server connection), because
874          * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
875          * message is received by the peer.
876          */
877
878         /* steal the socket_context */
879         sock = state->wreplconn->sock->sock;
880         state->wreplconn->sock->sock = NULL;
881         talloc_steal(state, sock);
882
883         /* 
884          * steal the packet_context
885          * note the request DATA_BLOB we just send on the
886          * wrepl_socket (client connection) is still unter the 
887          * packet context and will be send to the wire
888          */
889         packet = state->wreplconn->sock->packet;
890         state->wreplconn->sock->packet = NULL;
891         talloc_steal(state, packet);
892
893         /*
894          * get the fde_flags of the old fde event,
895          * so that we can later set the same flags to the new one
896          */
897         fde_flags = event_get_fd_flags(state->wreplconn->sock->event.fde);
898
899         /*
900          * free the wrepl_socket (client connection)
901          */
902         talloc_free(state->wreplconn->sock);
903         state->wreplconn->sock = NULL;
904
905         /*
906          * now create a wreplsrv_in_connection,
907          * on which we act as server
908          *
909          * NOTE: sock and packet will be stolen by
910          *       wreplsrv_in_connection_merge()
911          */
912         status = wreplsrv_in_connection_merge(state->io->in.partner,
913                                               sock, packet, &wrepl_in);
914         NT_STATUS_NOT_OK_RETURN(status);
915
916         event_set_fd_flags(wrepl_in->conn->event.fde, fde_flags);
917
918         wrepl_in->assoc_ctx.peer_ctx    = state->wreplconn->assoc_ctx.peer_ctx;
919         wrepl_in->assoc_ctx.our_ctx     = 0;
920
921         /* now we can free the wreplsrv_out_connection */
922         talloc_free(state->wreplconn);
923         state->wreplconn = NULL;
924
925         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
926
927         return NT_STATUS_OK;
928 }
929
930 static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *state)
931 {
932         struct wreplsrv_service *service = state->io->in.partner->service;
933         struct wrepl_packet *req = &state->req_packet;
934         struct wrepl_replication *repl_out = &state->req_packet.message.replication;
935         struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
936         NTSTATUS status;
937
938         req->opcode     = WREPL_OPCODE_BITS;
939         req->assoc_ctx  = state->wreplconn->assoc_ctx.peer_ctx;
940         req->mess_type  = WREPL_REPLICATION;
941
942         repl_out->command = state->command;
943
944         status = wreplsrv_fill_wrepl_table(service, state, table_out,
945                                            service->wins_db->local_owner, state->full_table);
946         NT_STATUS_NOT_OK_RETURN(status);
947
948         /* we won't get a reply to a inform message */
949         state->ctrl.send_only           = true;
950
951         state->req = wrepl_request_send(state->wreplconn->sock, req, &state->ctrl);
952         NT_STATUS_HAVE_NO_MEMORY(state->req);
953
954         state->req->async.fn            = wreplsrv_push_notify_handler_req;
955         state->req->async.private       = state;
956
957         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM;
958
959         return NT_STATUS_OK;
960 }
961
962 static NTSTATUS wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state *state)
963 {
964         NTSTATUS status;
965
966         status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
967         NT_STATUS_NOT_OK_RETURN(status);
968
969         switch (state->command) {
970         case WREPL_REPL_UPDATE:
971                 state->full_table = true;
972                 return wreplsrv_push_notify_update(state);
973         case WREPL_REPL_UPDATE2:
974                 state->full_table = false;
975                 return wreplsrv_push_notify_update(state);
976         case WREPL_REPL_INFORM:
977                 state->full_table = true;
978                 return wreplsrv_push_notify_inform(state);
979         case WREPL_REPL_INFORM2:
980                 state->full_table = false;
981                 return wreplsrv_push_notify_inform(state);
982         default:
983                 return NT_STATUS_INTERNAL_ERROR;
984         }
985
986         return NT_STATUS_INTERNAL_ERROR;
987 }
988
989 static NTSTATUS wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state *state)
990 {
991         NTSTATUS status;
992
993         status =  wrepl_request_recv(state->req, state, NULL);
994         NT_STATUS_NOT_OK_RETURN(status);
995
996         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
997         return status;
998 }
999
1000 static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state *state)
1001 {
1002         struct composite_context *c = state->c;
1003
1004         switch (state->stage) {
1005         case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT:
1006                 c->status = wreplsrv_push_notify_wait_connect(state);
1007                 break;
1008         case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM:
1009                 c->status = wreplsrv_push_notify_wait_inform(state);
1010                 break;
1011         case WREPLSRV_PUSH_NOTIFY_STAGE_DONE:
1012                 c->status = NT_STATUS_INTERNAL_ERROR;
1013         }
1014
1015         if (state->stage == WREPLSRV_PUSH_NOTIFY_STAGE_DONE) {
1016                 c->state  = COMPOSITE_STATE_DONE;
1017         }
1018
1019         if (!NT_STATUS_IS_OK(c->status)) {
1020                 c->state = COMPOSITE_STATE_ERROR;
1021         }
1022
1023         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
1024                 c->async.fn(c);
1025         }
1026 }
1027
1028 static void wreplsrv_push_notify_handler_creq(struct composite_context *creq)
1029 {
1030         struct wreplsrv_push_notify_state *state = talloc_get_type(creq->async.private_data,
1031                                                    struct wreplsrv_push_notify_state);
1032         wreplsrv_push_notify_handler(state);
1033         return;
1034 }
1035
1036 static void wreplsrv_push_notify_handler_req(struct wrepl_request *req)
1037 {
1038         struct wreplsrv_push_notify_state *state = talloc_get_type(req->async.private,
1039                                                    struct wreplsrv_push_notify_state);
1040         wreplsrv_push_notify_handler(state);
1041         return;
1042 }
1043
1044 struct composite_context *wreplsrv_push_notify_send(TALLOC_CTX *mem_ctx, struct wreplsrv_push_notify_io *io)
1045 {
1046         struct composite_context *c = NULL;
1047         struct wreplsrv_service *service = io->in.partner->service;
1048         struct wreplsrv_push_notify_state *state = NULL;
1049         enum winsrepl_partner_type partner_type;
1050
1051         c = talloc_zero(mem_ctx, struct composite_context);
1052         if (!c) goto failed;
1053
1054         state = talloc_zero(c, struct wreplsrv_push_notify_state);
1055         if (!state) goto failed;
1056         state->c        = c;
1057         state->io       = io;
1058
1059         if (io->in.inform) {
1060                 /* we can cache the connection in partner->push->wreplconn */
1061                 partner_type = WINSREPL_PARTNER_PUSH;
1062                 if (io->in.propagate) {
1063                         state->command  = WREPL_REPL_INFORM2;
1064                 } else {
1065                         state->command  = WREPL_REPL_INFORM;
1066                 }
1067         } else {
1068                 /* we can NOT cache the connection */
1069                 partner_type = WINSREPL_PARTNER_NONE;
1070                 if (io->in.propagate) {
1071                         state->command  = WREPL_REPL_UPDATE2;
1072                 } else {
1073                         state->command  = WREPL_REPL_UPDATE;
1074                 }       
1075         }
1076
1077         c->state        = COMPOSITE_STATE_IN_PROGRESS;
1078         c->event_ctx    = service->task->event_ctx;
1079         c->private_data = state;
1080
1081         state->stage    = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT;
1082         state->creq     = wreplsrv_out_connect_send(io->in.partner, partner_type, NULL);
1083         if (!state->creq) goto failed;
1084
1085         state->creq->async.fn           = wreplsrv_push_notify_handler_creq;
1086         state->creq->async.private_data = state;
1087
1088         return c;
1089 failed:
1090         talloc_free(c);
1091         return NULL;
1092 }
1093
1094 NTSTATUS wreplsrv_push_notify_recv(struct composite_context *c)
1095 {
1096         NTSTATUS status;
1097
1098         status = composite_wait(c);
1099
1100         talloc_free(c);
1101         return status;
1102 }