s4:wrepl_request: s/private/private_data
[bbaumbach/samba-autobuild/.git] / source4 / wrepl_server / wrepl_out_helpers.c
1 /* 
2    Unix SMB/CIFS implementation.
3    
4    WINS Replication server
5    
6    Copyright (C) Stefan Metzmacher      2005
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 3 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 */
21
22 #include "includes.h"
23 #include "lib/events/events.h"
24 #include "lib/socket/socket.h"
25 #include "smbd/service_task.h"
26 #include "smbd/service_stream.h"
27 #include "librpc/gen_ndr/winsrepl.h"
28 #include "wrepl_server/wrepl_server.h"
29 #include "nbt_server/wins/winsdb.h"
30 #include "libcli/composite/composite.h"
31 #include "libcli/wrepl/winsrepl.h"
32 #include "libcli/resolve/resolve.h"
33 #include "param/param.h"
34
35 enum wreplsrv_out_connect_stage {
36         WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET,
37         WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX,
38         WREPLSRV_OUT_CONNECT_STAGE_DONE
39 };
40
41 struct wreplsrv_out_connect_state {
42         enum wreplsrv_out_connect_stage stage;
43         struct composite_context *c;
44         struct wrepl_request *req;
45         struct composite_context *c_req;
46         struct wrepl_associate assoc_io;
47         enum winsrepl_partner_type type;
48         struct wreplsrv_out_connection *wreplconn;
49 };
50
51 static void wreplsrv_out_connect_handler_creq(struct composite_context *c_req);
52 static void wreplsrv_out_connect_handler_req(struct wrepl_request *req);
53
54 static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state)
55 {
56         NTSTATUS status;
57
58         status = wrepl_connect_recv(state->c_req);
59         NT_STATUS_NOT_OK_RETURN(status);
60
61         state->req = wrepl_associate_send(state->wreplconn->sock, &state->assoc_io);
62         NT_STATUS_HAVE_NO_MEMORY(state->req);
63
64         state->req->async.fn            = wreplsrv_out_connect_handler_req;
65         state->req->async.private_data  = state;
66
67         state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX;
68
69         return NT_STATUS_OK;
70 }
71
72 static NTSTATUS wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state *state)
73 {
74         NTSTATUS status;
75
76         status = wrepl_associate_recv(state->req, &state->assoc_io);
77         NT_STATUS_NOT_OK_RETURN(status);
78
79         state->wreplconn->assoc_ctx.peer_ctx = state->assoc_io.out.assoc_ctx;
80         state->wreplconn->assoc_ctx.peer_major = state->assoc_io.out.major_version;
81
82         if (state->type == WINSREPL_PARTNER_PUSH) {
83                 if (state->wreplconn->assoc_ctx.peer_major >= 5) {
84                         state->wreplconn->partner->push.wreplconn = state->wreplconn;
85                         talloc_steal(state->wreplconn->partner, state->wreplconn);
86                 } else {
87                         state->type = WINSREPL_PARTNER_NONE;
88                 }
89         } else if (state->type == WINSREPL_PARTNER_PULL) {
90                 state->wreplconn->partner->pull.wreplconn = state->wreplconn;
91                 talloc_steal(state->wreplconn->partner, state->wreplconn);
92         }
93
94         state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
95
96         return NT_STATUS_OK;
97 }
98
99 static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state *state)
100 {
101         struct composite_context *c = state->c;
102
103         switch (state->stage) {
104         case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET:
105                 c->status = wreplsrv_out_connect_wait_socket(state);
106                 break;
107         case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX:
108                 c->status = wreplsrv_out_connect_wait_assoc_ctx(state);
109                 c->state  = COMPOSITE_STATE_DONE;
110                 break;
111         case WREPLSRV_OUT_CONNECT_STAGE_DONE:
112                 c->status = NT_STATUS_INTERNAL_ERROR;
113         }
114
115         if (!NT_STATUS_IS_OK(c->status)) {
116                 c->state = COMPOSITE_STATE_ERROR;
117         }
118
119         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
120                 c->async.fn(c);
121         }
122 }
123
124 static void wreplsrv_out_connect_handler_creq(struct composite_context *creq)
125 {
126         struct wreplsrv_out_connect_state *state = talloc_get_type(creq->async.private_data,
127                                                    struct wreplsrv_out_connect_state);
128         wreplsrv_out_connect_handler(state);
129         return;
130 }
131
132 static void wreplsrv_out_connect_handler_req(struct wrepl_request *req)
133 {
134         struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private_data,
135                                                    struct wreplsrv_out_connect_state);
136         wreplsrv_out_connect_handler(state);
137         return;
138 }
139
140 static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner,
141                                                            enum winsrepl_partner_type type,
142                                                            struct wreplsrv_out_connection *wreplconn)
143 {
144         struct composite_context *c = NULL;
145         struct wreplsrv_service *service = partner->service;
146         struct wreplsrv_out_connect_state *state = NULL;
147         struct wreplsrv_out_connection **wreplconnp = &wreplconn;
148         bool cached_connection = false;
149
150         c = talloc_zero(partner, struct composite_context);
151         if (!c) goto failed;
152
153         state = talloc_zero(c, struct wreplsrv_out_connect_state);
154         if (!state) goto failed;
155         state->c        = c;
156         state->type     = type;
157
158         c->state        = COMPOSITE_STATE_IN_PROGRESS;
159         c->event_ctx    = service->task->event_ctx;
160         c->private_data = state;
161
162         if (type == WINSREPL_PARTNER_PUSH) {
163                 cached_connection       = true;
164                 wreplconn               = partner->push.wreplconn;
165                 wreplconnp              = &partner->push.wreplconn;
166         } else if (type == WINSREPL_PARTNER_PULL) {
167                 cached_connection       = true;
168                 wreplconn               = partner->pull.wreplconn;
169                 wreplconnp              = &partner->pull.wreplconn;
170         }
171
172         /* we have a connection already, so use it */
173         if (wreplconn) {
174                 if (!wreplconn->sock->dead) {
175                         state->stage    = WREPLSRV_OUT_CONNECT_STAGE_DONE;
176                         state->wreplconn= wreplconn;
177                         composite_done(c);
178                         return c;
179                 } else if (!cached_connection) {
180                         state->stage    = WREPLSRV_OUT_CONNECT_STAGE_DONE;
181                         state->wreplconn= NULL;
182                         composite_done(c);
183                         return c;
184                 } else {
185                         talloc_free(wreplconn);
186                         *wreplconnp = NULL;
187                 }
188         }
189
190         wreplconn = talloc_zero(state, struct wreplsrv_out_connection);
191         if (!wreplconn) goto failed;
192
193         wreplconn->service      = service;
194         wreplconn->partner      = partner;
195         wreplconn->sock         = wrepl_socket_init(wreplconn, service->task->event_ctx, lp_iconv_convenience(service->task->lp_ctx));
196         if (!wreplconn->sock) goto failed;
197
198         state->stage    = WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET;
199         state->wreplconn= wreplconn;
200         state->c_req    = wrepl_connect_send(wreplconn->sock,
201                                              partner->our_address?partner->our_address:wrepl_best_ip(service->task->lp_ctx, partner->address),
202                                              partner->address);
203         if (!state->c_req) goto failed;
204
205         state->c_req->async.fn                  = wreplsrv_out_connect_handler_creq;
206         state->c_req->async.private_data        = state;
207
208         return c;
209 failed:
210         talloc_free(c);
211         return NULL;
212 }
213
214 static NTSTATUS wreplsrv_out_connect_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
215                                           struct wreplsrv_out_connection **wreplconn)
216 {
217         NTSTATUS status;
218
219         status = composite_wait(c);
220
221         if (NT_STATUS_IS_OK(status)) {
222                 struct wreplsrv_out_connect_state *state = talloc_get_type(c->private_data,
223                                                            struct wreplsrv_out_connect_state);
224                 if (state->wreplconn) {
225                         *wreplconn = talloc_reference(mem_ctx, state->wreplconn);
226                         if (!*wreplconn) status = NT_STATUS_NO_MEMORY;
227                 } else {
228                         status = NT_STATUS_INVALID_CONNECTION;
229                 }
230         }
231
232         talloc_free(c);
233         return status;
234         
235 }
236
237 struct wreplsrv_pull_table_io {
238         struct {
239                 struct wreplsrv_partner *partner;
240                 uint32_t num_owners;
241                 struct wrepl_wins_owner *owners;
242         } in;
243         struct {
244                 uint32_t num_owners;
245                 struct wrepl_wins_owner *owners;
246         } out;
247 };
248
249 enum wreplsrv_pull_table_stage {
250         WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION,
251         WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY,
252         WREPLSRV_PULL_TABLE_STAGE_DONE
253 };
254
255 struct wreplsrv_pull_table_state {
256         enum wreplsrv_pull_table_stage stage;
257         struct composite_context *c;
258         struct wrepl_request *req;
259         struct wrepl_pull_table table_io;
260         struct wreplsrv_pull_table_io *io;
261         struct composite_context *creq;
262         struct wreplsrv_out_connection *wreplconn;
263 };
264
265 static void wreplsrv_pull_table_handler_req(struct wrepl_request *req);
266
267 static NTSTATUS wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state *state)
268 {
269         NTSTATUS status;
270
271         status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
272         NT_STATUS_NOT_OK_RETURN(status);
273
274         state->table_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
275         state->req = wrepl_pull_table_send(state->wreplconn->sock, &state->table_io);
276         NT_STATUS_HAVE_NO_MEMORY(state->req);
277
278         state->req->async.fn            = wreplsrv_pull_table_handler_req;
279         state->req->async.private_data  = state;
280
281         state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY;
282
283         return NT_STATUS_OK;
284 }
285
286 static NTSTATUS wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state *state)
287 {
288         NTSTATUS status;
289
290         status = wrepl_pull_table_recv(state->req, state, &state->table_io);
291         NT_STATUS_NOT_OK_RETURN(status);
292
293         state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
294
295         return NT_STATUS_OK;
296 }
297
298 static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state *state)
299 {
300         struct composite_context *c = state->c;
301
302         switch (state->stage) {
303         case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION:
304                 c->status = wreplsrv_pull_table_wait_connection(state);
305                 break;
306         case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY:
307                 c->status = wreplsrv_pull_table_wait_table_reply(state);
308                 c->state  = COMPOSITE_STATE_DONE;
309                 break;
310         case WREPLSRV_PULL_TABLE_STAGE_DONE:
311                 c->status = NT_STATUS_INTERNAL_ERROR;
312         }
313
314         if (!NT_STATUS_IS_OK(c->status)) {
315                 c->state = COMPOSITE_STATE_ERROR;
316         }
317
318         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
319                 c->async.fn(c);
320         }
321 }
322
323 static void wreplsrv_pull_table_handler_creq(struct composite_context *creq)
324 {
325         struct wreplsrv_pull_table_state *state = talloc_get_type(creq->async.private_data,
326                                                   struct wreplsrv_pull_table_state);
327         wreplsrv_pull_table_handler(state);
328         return;
329 }
330
331 static void wreplsrv_pull_table_handler_req(struct wrepl_request *req)
332 {
333         struct wreplsrv_pull_table_state *state = talloc_get_type(req->async.private_data,
334                                                   struct wreplsrv_pull_table_state);
335         wreplsrv_pull_table_handler(state);
336         return;
337 }
338
339 static struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io)
340 {
341         struct composite_context *c = NULL;
342         struct wreplsrv_service *service = io->in.partner->service;
343         struct wreplsrv_pull_table_state *state = NULL;
344
345         c = talloc_zero(mem_ctx, struct composite_context);
346         if (!c) goto failed;
347
348         state = talloc_zero(c, struct wreplsrv_pull_table_state);
349         if (!state) goto failed;
350         state->c        = c;
351         state->io       = io;
352
353         c->state        = COMPOSITE_STATE_IN_PROGRESS;
354         c->event_ctx    = service->task->event_ctx;
355         c->private_data = state;
356
357         if (io->in.num_owners) {
358                 state->table_io.out.num_partners        = io->in.num_owners;
359                 state->table_io.out.partners            = io->in.owners;
360                 state->stage                            = WREPLSRV_PULL_TABLE_STAGE_DONE;
361                 composite_done(c);
362                 return c;
363         }
364
365         state->stage    = WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION;
366         state->creq     = wreplsrv_out_connect_send(io->in.partner, WINSREPL_PARTNER_PULL, NULL);
367         if (!state->creq) goto failed;
368
369         state->creq->async.fn           = wreplsrv_pull_table_handler_creq;
370         state->creq->async.private_data = state;
371
372         return c;
373 failed:
374         talloc_free(c);
375         return NULL;
376 }
377
378 static NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
379                                          struct wreplsrv_pull_table_io *io)
380 {
381         NTSTATUS status;
382
383         status = composite_wait(c);
384
385         if (NT_STATUS_IS_OK(status)) {
386                 struct wreplsrv_pull_table_state *state = talloc_get_type(c->private_data,
387                                                           struct wreplsrv_pull_table_state);
388                 io->out.num_owners      = state->table_io.out.num_partners;
389                 io->out.owners          = talloc_reference(mem_ctx, state->table_io.out.partners);
390         }
391
392         talloc_free(c);
393         return status;  
394 }
395
396 struct wreplsrv_pull_names_io {
397         struct {
398                 struct wreplsrv_partner *partner;
399                 struct wreplsrv_out_connection *wreplconn;
400                 struct wrepl_wins_owner owner;
401         } in;
402         struct {
403                 uint32_t num_names;
404                 struct wrepl_name *names;
405         } out;
406 };
407
408 enum wreplsrv_pull_names_stage {
409         WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION,
410         WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY,
411         WREPLSRV_PULL_NAMES_STAGE_DONE
412 };
413
414 struct wreplsrv_pull_names_state {
415         enum wreplsrv_pull_names_stage stage;
416         struct composite_context *c;
417         struct wrepl_request *req;
418         struct wrepl_pull_names pull_io;
419         struct wreplsrv_pull_names_io *io;
420         struct composite_context *creq;
421         struct wreplsrv_out_connection *wreplconn;
422 };
423
424 static void wreplsrv_pull_names_handler_req(struct wrepl_request *req);
425
426 static NTSTATUS wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state *state)
427 {
428         NTSTATUS status;
429
430         status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
431         NT_STATUS_NOT_OK_RETURN(status);
432
433         state->pull_io.in.assoc_ctx     = state->wreplconn->assoc_ctx.peer_ctx;
434         state->pull_io.in.partner       = state->io->in.owner;
435         state->req = wrepl_pull_names_send(state->wreplconn->sock, &state->pull_io);
436         NT_STATUS_HAVE_NO_MEMORY(state->req);
437
438         state->req->async.fn            = wreplsrv_pull_names_handler_req;
439         state->req->async.private_data  = state;
440
441         state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY;
442
443         return NT_STATUS_OK;
444 }
445
446 static NTSTATUS wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state *state)
447 {
448         NTSTATUS status;
449
450         status = wrepl_pull_names_recv(state->req, state, &state->pull_io);
451         NT_STATUS_NOT_OK_RETURN(status);
452
453         state->stage = WREPLSRV_PULL_NAMES_STAGE_DONE;
454
455         return NT_STATUS_OK;
456 }
457
458 static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state *state)
459 {
460         struct composite_context *c = state->c;
461
462         switch (state->stage) {
463         case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION:
464                 c->status = wreplsrv_pull_names_wait_connection(state);
465                 break;
466         case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY:
467                 c->status = wreplsrv_pull_names_wait_send_reply(state);
468                 c->state  = COMPOSITE_STATE_DONE;
469                 break;
470         case WREPLSRV_PULL_NAMES_STAGE_DONE:
471                 c->status = NT_STATUS_INTERNAL_ERROR;
472         }
473
474         if (!NT_STATUS_IS_OK(c->status)) {
475                 c->state = COMPOSITE_STATE_ERROR;
476         }
477
478         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
479                 c->async.fn(c);
480         }
481 }
482
483 static void wreplsrv_pull_names_handler_creq(struct composite_context *creq)
484 {
485         struct wreplsrv_pull_names_state *state = talloc_get_type(creq->async.private_data,
486                                                   struct wreplsrv_pull_names_state);
487         wreplsrv_pull_names_handler(state);
488         return;
489 }
490
491 static void wreplsrv_pull_names_handler_req(struct wrepl_request *req)
492 {
493         struct wreplsrv_pull_names_state *state = talloc_get_type(req->async.private_data,
494                                                   struct wreplsrv_pull_names_state);
495         wreplsrv_pull_names_handler(state);
496         return;
497 }
498
499 static struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io)
500 {
501         struct composite_context *c = NULL;
502         struct wreplsrv_service *service = io->in.partner->service;
503         struct wreplsrv_pull_names_state *state = NULL;
504         enum winsrepl_partner_type partner_type = WINSREPL_PARTNER_PULL;
505
506         if (io->in.wreplconn) partner_type = WINSREPL_PARTNER_NONE;
507
508         c = talloc_zero(mem_ctx, struct composite_context);
509         if (!c) goto failed;
510
511         state = talloc_zero(c, struct wreplsrv_pull_names_state);
512         if (!state) goto failed;
513         state->c        = c;
514         state->io       = io;
515
516         c->state        = COMPOSITE_STATE_IN_PROGRESS;
517         c->event_ctx    = service->task->event_ctx;
518         c->private_data = state;
519
520         state->stage    = WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION;
521         state->creq     = wreplsrv_out_connect_send(io->in.partner, partner_type, io->in.wreplconn);
522         if (!state->creq) goto failed;
523
524         state->creq->async.fn           = wreplsrv_pull_names_handler_creq;
525         state->creq->async.private_data = state;
526
527         return c;
528 failed:
529         talloc_free(c);
530         return NULL;
531 }
532
533 static NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
534                                          struct wreplsrv_pull_names_io *io)
535 {
536         NTSTATUS status;
537
538         status = composite_wait(c);
539
540         if (NT_STATUS_IS_OK(status)) {
541                 struct wreplsrv_pull_names_state *state = talloc_get_type(c->private_data,
542                                                           struct wreplsrv_pull_names_state);
543                 io->out.num_names       = state->pull_io.out.num_names;
544                 io->out.names           = talloc_reference(mem_ctx, state->pull_io.out.names);
545         }
546
547         talloc_free(c);
548         return status;
549         
550 }
551
552 enum wreplsrv_pull_cycle_stage {
553         WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY,
554         WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES,
555         WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC,
556         WREPLSRV_PULL_CYCLE_STAGE_DONE
557 };
558
559 struct wreplsrv_pull_cycle_state {
560         enum wreplsrv_pull_cycle_stage stage;
561         struct composite_context *c;
562         struct wreplsrv_pull_cycle_io *io;
563         struct wreplsrv_pull_table_io table_io;
564         uint32_t current;
565         struct wreplsrv_pull_names_io names_io;
566         struct composite_context *creq;
567         struct wrepl_associate_stop assoc_stop_io;
568         struct wrepl_request *req;
569 };
570
571 static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq);
572 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req);
573
574 static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state *state)
575 {
576         struct wreplsrv_owner *current_owner=NULL;
577         struct wreplsrv_owner *local_owner;
578         uint32_t i;
579         uint64_t old_max_version = 0;
580         bool do_pull = false;
581
582         for (i=state->current; i < state->table_io.out.num_owners; i++) {
583                 current_owner = wreplsrv_find_owner(state->io->in.partner->service,
584                                                     state->io->in.partner->pull.table,
585                                                     state->table_io.out.owners[i].address);
586
587                 local_owner = wreplsrv_find_owner(state->io->in.partner->service,
588                                                   state->io->in.partner->service->table,
589                                                   state->table_io.out.owners[i].address);
590                 /*
591                  * this means we are ourself the current owner,
592                  * and we don't want replicate ourself
593                  */
594                 if (!current_owner) continue;
595
596                 /*
597                  * this means we don't have any records of this owner
598                  * so fetch them
599                  */
600                 if (!local_owner) {
601                         do_pull         = true;
602                         
603                         break;
604                 }
605
606                 /*
607                  * this means the remote partner has some new records of this owner
608                  * fetch them
609                  */
610                 if (current_owner->owner.max_version > local_owner->owner.max_version) {
611                         do_pull         = true;
612                         old_max_version = local_owner->owner.max_version;
613                         break;
614                 }
615         }
616         state->current = i;
617
618         if (do_pull) {
619                 state->names_io.in.partner              = state->io->in.partner;
620                 state->names_io.in.wreplconn            = state->io->in.wreplconn;
621                 state->names_io.in.owner                = current_owner->owner;
622                 state->names_io.in.owner.min_version    = old_max_version + 1;
623                 state->creq = wreplsrv_pull_names_send(state, &state->names_io);
624                 NT_STATUS_HAVE_NO_MEMORY(state->creq);
625
626                 state->creq->async.fn           = wreplsrv_pull_cycle_handler_creq;
627                 state->creq->async.private_data = state;
628
629                 return STATUS_MORE_ENTRIES;
630         }
631
632         return NT_STATUS_OK;
633 }
634
635 static NTSTATUS wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state *state)
636 {
637         NTSTATUS status;
638
639         status = wreplsrv_pull_cycle_next_owner_do_work(state);
640         if (NT_STATUS_IS_OK(status)) {
641                 state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
642         } else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES, status)) {
643                 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES;
644                 status = NT_STATUS_OK;
645         }
646
647         if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE && state->io->in.wreplconn) {
648                 state->assoc_stop_io.in.assoc_ctx       = state->io->in.wreplconn->assoc_ctx.peer_ctx;
649                 state->assoc_stop_io.in.reason          = 0;
650                 state->req = wrepl_associate_stop_send(state->io->in.wreplconn->sock, &state->assoc_stop_io);
651                 NT_STATUS_HAVE_NO_MEMORY(state->req);
652
653                 state->req->async.fn            = wreplsrv_pull_cycle_handler_req;
654                 state->req->async.private_data  = state;
655
656                 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC;
657         }
658
659         return status;
660 }
661
662 static NTSTATUS wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state *state)
663 {
664         NTSTATUS status;
665         uint32_t i;
666
667         status = wreplsrv_pull_table_recv(state->creq, state, &state->table_io);
668         NT_STATUS_NOT_OK_RETURN(status);
669
670         /* update partner table */
671         for (i=0; i < state->table_io.out.num_owners; i++) {
672                 status = wreplsrv_add_table(state->io->in.partner->service,
673                                             state->io->in.partner, 
674                                             &state->io->in.partner->pull.table,
675                                             state->table_io.out.owners[i].address,
676                                             state->table_io.out.owners[i].max_version);
677                 NT_STATUS_NOT_OK_RETURN(status);
678         }
679
680         status = wreplsrv_pull_cycle_next_owner_wrapper(state);
681         NT_STATUS_NOT_OK_RETURN(status);
682
683         return status;
684 }
685
686 static NTSTATUS wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state *state)
687 {
688         NTSTATUS status;
689
690         status = wreplsrv_apply_records(state->io->in.partner,
691                                         &state->names_io.in.owner,
692                                         state->names_io.out.num_names,
693                                         state->names_io.out.names);
694         NT_STATUS_NOT_OK_RETURN(status);
695
696         talloc_free(state->names_io.out.names);
697         ZERO_STRUCT(state->names_io);
698
699         return NT_STATUS_OK;
700 }
701
702 static NTSTATUS wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state *state)
703 {
704         NTSTATUS status;
705
706         status = wreplsrv_pull_names_recv(state->creq, state, &state->names_io);
707         NT_STATUS_NOT_OK_RETURN(status);
708
709         /*
710          * TODO: this should maybe an async call,
711          *       because we may need some network access
712          *       for conflict resolving
713          */
714         status = wreplsrv_pull_cycle_apply_records(state);
715         NT_STATUS_NOT_OK_RETURN(status);
716
717         status = wreplsrv_pull_cycle_next_owner_wrapper(state);
718         NT_STATUS_NOT_OK_RETURN(status);
719
720         return status;
721 }
722
723 static NTSTATUS wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state *state)
724 {
725         NTSTATUS status;
726
727         status = wrepl_associate_stop_recv(state->req, &state->assoc_stop_io);
728         NT_STATUS_NOT_OK_RETURN(status);
729
730         state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
731
732         return status;
733 }
734
735 static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state *state)
736 {
737         struct composite_context *c = state->c;
738
739         switch (state->stage) {
740         case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY:
741                 c->status = wreplsrv_pull_cycle_wait_table_reply(state);
742                 break;
743         case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES:
744                 c->status = wreplsrv_pull_cycle_wait_send_replies(state);
745                 break;
746         case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC:
747                 c->status = wreplsrv_pull_cycle_wait_stop_assoc(state);
748                 break;
749         case WREPLSRV_PULL_CYCLE_STAGE_DONE:
750                 c->status = NT_STATUS_INTERNAL_ERROR;
751         }
752
753         if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE) {
754                 c->state  = COMPOSITE_STATE_DONE;
755         }
756
757         if (!NT_STATUS_IS_OK(c->status)) {
758                 c->state = COMPOSITE_STATE_ERROR;
759         }
760
761         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
762                 c->async.fn(c);
763         }
764 }
765
766 static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq)
767 {
768         struct wreplsrv_pull_cycle_state *state = talloc_get_type(creq->async.private_data,
769                                                   struct wreplsrv_pull_cycle_state);
770         wreplsrv_pull_cycle_handler(state);
771         return;
772 }
773
774 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req)
775 {
776         struct wreplsrv_pull_cycle_state *state = talloc_get_type(req->async.private_data,
777                                                   struct wreplsrv_pull_cycle_state);
778         wreplsrv_pull_cycle_handler(state);
779         return;
780 }
781
782 struct composite_context *wreplsrv_pull_cycle_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_cycle_io *io)
783 {
784         struct composite_context *c = NULL;
785         struct wreplsrv_service *service = io->in.partner->service;
786         struct wreplsrv_pull_cycle_state *state = NULL;
787
788         c = talloc_zero(mem_ctx, struct composite_context);
789         if (!c) goto failed;
790
791         state = talloc_zero(c, struct wreplsrv_pull_cycle_state);
792         if (!state) goto failed;
793         state->c        = c;
794         state->io       = io;
795
796         c->state        = COMPOSITE_STATE_IN_PROGRESS;
797         c->event_ctx    = service->task->event_ctx;
798         c->private_data = state;
799
800         state->stage    = WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY;
801         state->table_io.in.partner      = io->in.partner;
802         state->table_io.in.num_owners   = io->in.num_owners;
803         state->table_io.in.owners       = io->in.owners;
804         state->creq = wreplsrv_pull_table_send(state, &state->table_io);
805         if (!state->creq) goto failed;
806
807         state->creq->async.fn           = wreplsrv_pull_cycle_handler_creq;
808         state->creq->async.private_data = state;
809
810         return c;
811 failed:
812         talloc_free(c);
813         return NULL;
814 }
815
816 NTSTATUS wreplsrv_pull_cycle_recv(struct composite_context *c)
817 {
818         NTSTATUS status;
819
820         status = composite_wait(c);
821
822         talloc_free(c);
823         return status;
824 }
825
826 enum wreplsrv_push_notify_stage {
827         WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT,
828         WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM,
829         WREPLSRV_PUSH_NOTIFY_STAGE_DONE
830 };
831
832 struct wreplsrv_push_notify_state {
833         enum wreplsrv_push_notify_stage stage;
834         struct composite_context *c;
835         struct wreplsrv_push_notify_io *io;
836         enum wrepl_replication_cmd command;
837         bool full_table;
838         struct wrepl_send_ctrl ctrl;
839         struct wrepl_request *req;
840         struct wrepl_packet req_packet;
841         struct wrepl_packet *rep_packet;
842         struct composite_context *creq;
843         struct wreplsrv_out_connection *wreplconn;
844 };
845
846 static void wreplsrv_push_notify_handler_creq(struct composite_context *creq);
847 static void wreplsrv_push_notify_handler_req(struct wrepl_request *req);
848
849 static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *state)
850 {
851         struct wreplsrv_service *service = state->io->in.partner->service;
852         struct wrepl_packet *req = &state->req_packet;
853         struct wrepl_replication *repl_out = &state->req_packet.message.replication;
854         struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
855         struct wreplsrv_in_connection *wrepl_in;
856         NTSTATUS status;
857         struct socket_context *sock;
858         struct packet_context *packet;
859         uint16_t fde_flags;
860
861         /* prepare the outgoing request */
862         req->opcode     = WREPL_OPCODE_BITS;
863         req->assoc_ctx  = state->wreplconn->assoc_ctx.peer_ctx;
864         req->mess_type  = WREPL_REPLICATION;
865
866         repl_out->command = state->command;
867
868         status = wreplsrv_fill_wrepl_table(service, state, table_out,
869                                            service->wins_db->local_owner, state->full_table);
870         NT_STATUS_NOT_OK_RETURN(status);
871
872         /* queue the request */
873         state->req = wrepl_request_send(state->wreplconn->sock, req, NULL);
874         NT_STATUS_HAVE_NO_MEMORY(state->req);
875
876         /*
877          * now we need to convert the wrepl_socket (client connection)
878          * into a wreplsrv_in_connection (server connection), because
879          * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
880          * message is received by the peer.
881          */
882
883         /* steal the socket_context */
884         sock = state->wreplconn->sock->sock;
885         state->wreplconn->sock->sock = NULL;
886         talloc_steal(state, sock);
887
888         /* 
889          * steal the packet_context
890          * note the request DATA_BLOB we just send on the
891          * wrepl_socket (client connection) is still unter the 
892          * packet context and will be send to the wire
893          */
894         packet = state->wreplconn->sock->packet;
895         state->wreplconn->sock->packet = NULL;
896         talloc_steal(state, packet);
897
898         /*
899          * get the fde_flags of the old fde event,
900          * so that we can later set the same flags to the new one
901          */
902         fde_flags = event_get_fd_flags(state->wreplconn->sock->event.fde);
903
904         /*
905          * free the wrepl_socket (client connection)
906          */
907         talloc_free(state->wreplconn->sock);
908         state->wreplconn->sock = NULL;
909
910         /*
911          * now create a wreplsrv_in_connection,
912          * on which we act as server
913          *
914          * NOTE: sock and packet will be stolen by
915          *       wreplsrv_in_connection_merge()
916          */
917         status = wreplsrv_in_connection_merge(state->io->in.partner,
918                                               sock, packet, &wrepl_in);
919         NT_STATUS_NOT_OK_RETURN(status);
920
921         event_set_fd_flags(wrepl_in->conn->event.fde, fde_flags);
922
923         wrepl_in->assoc_ctx.peer_ctx    = state->wreplconn->assoc_ctx.peer_ctx;
924         wrepl_in->assoc_ctx.our_ctx     = 0;
925
926         /* now we can free the wreplsrv_out_connection */
927         talloc_free(state->wreplconn);
928         state->wreplconn = NULL;
929
930         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
931
932         return NT_STATUS_OK;
933 }
934
935 static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *state)
936 {
937         struct wreplsrv_service *service = state->io->in.partner->service;
938         struct wrepl_packet *req = &state->req_packet;
939         struct wrepl_replication *repl_out = &state->req_packet.message.replication;
940         struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
941         NTSTATUS status;
942
943         req->opcode     = WREPL_OPCODE_BITS;
944         req->assoc_ctx  = state->wreplconn->assoc_ctx.peer_ctx;
945         req->mess_type  = WREPL_REPLICATION;
946
947         repl_out->command = state->command;
948
949         status = wreplsrv_fill_wrepl_table(service, state, table_out,
950                                            service->wins_db->local_owner, state->full_table);
951         NT_STATUS_NOT_OK_RETURN(status);
952
953         /* we won't get a reply to a inform message */
954         state->ctrl.send_only           = true;
955
956         state->req = wrepl_request_send(state->wreplconn->sock, req, &state->ctrl);
957         NT_STATUS_HAVE_NO_MEMORY(state->req);
958
959         state->req->async.fn            = wreplsrv_push_notify_handler_req;
960         state->req->async.private_data  = state;
961
962         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM;
963
964         return NT_STATUS_OK;
965 }
966
967 static NTSTATUS wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state *state)
968 {
969         NTSTATUS status;
970
971         status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
972         NT_STATUS_NOT_OK_RETURN(status);
973
974         /* is the peer doesn't support inform fallback to update */
975         switch (state->command) {
976         case WREPL_REPL_INFORM:
977                 if (state->wreplconn->assoc_ctx.peer_major < 5) {
978                         state->command = WREPL_REPL_UPDATE;
979                 }
980                 break;
981         case WREPL_REPL_INFORM2:
982                 if (state->wreplconn->assoc_ctx.peer_major < 5) {
983                         state->command = WREPL_REPL_UPDATE2;
984                 }
985                 break;
986         default:
987                 break;
988         }
989
990         switch (state->command) {
991         case WREPL_REPL_UPDATE:
992                 state->full_table = true;
993                 return wreplsrv_push_notify_update(state);
994         case WREPL_REPL_UPDATE2:
995                 state->full_table = false;
996                 return wreplsrv_push_notify_update(state);
997         case WREPL_REPL_INFORM:
998                 state->full_table = true;
999                 return wreplsrv_push_notify_inform(state);
1000         case WREPL_REPL_INFORM2:
1001                 state->full_table = false;
1002                 return wreplsrv_push_notify_inform(state);
1003         default:
1004                 return NT_STATUS_INTERNAL_ERROR;
1005         }
1006
1007         return NT_STATUS_INTERNAL_ERROR;
1008 }
1009
1010 static NTSTATUS wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state *state)
1011 {
1012         NTSTATUS status;
1013
1014         status =  wrepl_request_recv(state->req, state, NULL);
1015         NT_STATUS_NOT_OK_RETURN(status);
1016
1017         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
1018         return status;
1019 }
1020
1021 static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state *state)
1022 {
1023         struct composite_context *c = state->c;
1024
1025         switch (state->stage) {
1026         case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT:
1027                 c->status = wreplsrv_push_notify_wait_connect(state);
1028                 break;
1029         case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM:
1030                 c->status = wreplsrv_push_notify_wait_inform(state);
1031                 break;
1032         case WREPLSRV_PUSH_NOTIFY_STAGE_DONE:
1033                 c->status = NT_STATUS_INTERNAL_ERROR;
1034         }
1035
1036         if (state->stage == WREPLSRV_PUSH_NOTIFY_STAGE_DONE) {
1037                 c->state  = COMPOSITE_STATE_DONE;
1038         }
1039
1040         if (!NT_STATUS_IS_OK(c->status)) {
1041                 c->state = COMPOSITE_STATE_ERROR;
1042         }
1043
1044         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
1045                 c->async.fn(c);
1046         }
1047 }
1048
1049 static void wreplsrv_push_notify_handler_creq(struct composite_context *creq)
1050 {
1051         struct wreplsrv_push_notify_state *state = talloc_get_type(creq->async.private_data,
1052                                                    struct wreplsrv_push_notify_state);
1053         wreplsrv_push_notify_handler(state);
1054         return;
1055 }
1056
1057 static void wreplsrv_push_notify_handler_req(struct wrepl_request *req)
1058 {
1059         struct wreplsrv_push_notify_state *state = talloc_get_type(req->async.private_data,
1060                                                    struct wreplsrv_push_notify_state);
1061         wreplsrv_push_notify_handler(state);
1062         return;
1063 }
1064
1065 struct composite_context *wreplsrv_push_notify_send(TALLOC_CTX *mem_ctx, struct wreplsrv_push_notify_io *io)
1066 {
1067         struct composite_context *c = NULL;
1068         struct wreplsrv_service *service = io->in.partner->service;
1069         struct wreplsrv_push_notify_state *state = NULL;
1070         enum winsrepl_partner_type partner_type;
1071
1072         c = talloc_zero(mem_ctx, struct composite_context);
1073         if (!c) goto failed;
1074
1075         state = talloc_zero(c, struct wreplsrv_push_notify_state);
1076         if (!state) goto failed;
1077         state->c        = c;
1078         state->io       = io;
1079
1080         if (io->in.inform) {
1081                 /* we can cache the connection in partner->push->wreplconn */
1082                 partner_type = WINSREPL_PARTNER_PUSH;
1083                 if (io->in.propagate) {
1084                         state->command  = WREPL_REPL_INFORM2;
1085                 } else {
1086                         state->command  = WREPL_REPL_INFORM;
1087                 }
1088         } else {
1089                 /* we can NOT cache the connection */
1090                 partner_type = WINSREPL_PARTNER_NONE;
1091                 if (io->in.propagate) {
1092                         state->command  = WREPL_REPL_UPDATE2;
1093                 } else {
1094                         state->command  = WREPL_REPL_UPDATE;
1095                 }       
1096         }
1097
1098         c->state        = COMPOSITE_STATE_IN_PROGRESS;
1099         c->event_ctx    = service->task->event_ctx;
1100         c->private_data = state;
1101
1102         state->stage    = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT;
1103         state->creq     = wreplsrv_out_connect_send(io->in.partner, partner_type, NULL);
1104         if (!state->creq) goto failed;
1105
1106         state->creq->async.fn           = wreplsrv_push_notify_handler_creq;
1107         state->creq->async.private_data = state;
1108
1109         return c;
1110 failed:
1111         talloc_free(c);
1112         return NULL;
1113 }
1114
1115 NTSTATUS wreplsrv_push_notify_recv(struct composite_context *c)
1116 {
1117         NTSTATUS status;
1118
1119         status = composite_wait(c);
1120
1121         talloc_free(c);
1122         return status;
1123 }