s4:libcli/wrepl: convert wrepl_pull_table_send to tevent_req
[samba.git] / source4 / wrepl_server / wrepl_out_helpers.c
1 /* 
2    Unix SMB/CIFS implementation.
3    
4    WINS Replication server
5    
6    Copyright (C) Stefan Metzmacher      2005
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 3 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 */
21
22 #include "includes.h"
23 #include "lib/events/events.h"
24 #include "lib/socket/socket.h"
25 #include "smbd/service_task.h"
26 #include "smbd/service_stream.h"
27 #include "librpc/gen_ndr/winsrepl.h"
28 #include "wrepl_server/wrepl_server.h"
29 #include "nbt_server/wins/winsdb.h"
30 #include "libcli/composite/composite.h"
31 #include "libcli/wrepl/winsrepl.h"
32 #include "libcli/resolve/resolve.h"
33 #include "param/param.h"
34
35 enum wreplsrv_out_connect_stage {
36         WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET,
37         WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX,
38         WREPLSRV_OUT_CONNECT_STAGE_DONE
39 };
40
41 struct wreplsrv_out_connect_state {
42         enum wreplsrv_out_connect_stage stage;
43         struct composite_context *c;
44         struct wrepl_request *req;
45         struct composite_context *c_req;
46         struct wrepl_associate assoc_io;
47         enum winsrepl_partner_type type;
48         struct wreplsrv_out_connection *wreplconn;
49 };
50
51 static void wreplsrv_out_connect_handler_creq(struct composite_context *c_req);
52 static void wreplsrv_out_connect_handler_req(struct wrepl_request *req);
53
54 static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state)
55 {
56         NTSTATUS status;
57
58         status = wrepl_connect_recv(state->c_req);
59         NT_STATUS_NOT_OK_RETURN(status);
60
61         state->req = wrepl_associate_send(state->wreplconn->sock, &state->assoc_io);
62         NT_STATUS_HAVE_NO_MEMORY(state->req);
63
64         state->req->async.fn            = wreplsrv_out_connect_handler_req;
65         state->req->async.private_data  = state;
66
67         state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX;
68
69         return NT_STATUS_OK;
70 }
71
72 static NTSTATUS wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state *state)
73 {
74         NTSTATUS status;
75
76         status = wrepl_associate_recv(state->req, &state->assoc_io);
77         NT_STATUS_NOT_OK_RETURN(status);
78
79         state->wreplconn->assoc_ctx.peer_ctx = state->assoc_io.out.assoc_ctx;
80         state->wreplconn->assoc_ctx.peer_major = state->assoc_io.out.major_version;
81
82         if (state->type == WINSREPL_PARTNER_PUSH) {
83                 if (state->wreplconn->assoc_ctx.peer_major >= 5) {
84                         state->wreplconn->partner->push.wreplconn = state->wreplconn;
85                         talloc_steal(state->wreplconn->partner, state->wreplconn);
86                 } else {
87                         state->type = WINSREPL_PARTNER_NONE;
88                 }
89         } else if (state->type == WINSREPL_PARTNER_PULL) {
90                 state->wreplconn->partner->pull.wreplconn = state->wreplconn;
91                 talloc_steal(state->wreplconn->partner, state->wreplconn);
92         }
93
94         state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
95
96         return NT_STATUS_OK;
97 }
98
99 static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state *state)
100 {
101         struct composite_context *c = state->c;
102
103         switch (state->stage) {
104         case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET:
105                 c->status = wreplsrv_out_connect_wait_socket(state);
106                 break;
107         case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX:
108                 c->status = wreplsrv_out_connect_wait_assoc_ctx(state);
109                 c->state  = COMPOSITE_STATE_DONE;
110                 break;
111         case WREPLSRV_OUT_CONNECT_STAGE_DONE:
112                 c->status = NT_STATUS_INTERNAL_ERROR;
113         }
114
115         if (!NT_STATUS_IS_OK(c->status)) {
116                 c->state = COMPOSITE_STATE_ERROR;
117         }
118
119         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
120                 c->async.fn(c);
121         }
122 }
123
124 static void wreplsrv_out_connect_handler_creq(struct composite_context *creq)
125 {
126         struct wreplsrv_out_connect_state *state = talloc_get_type(creq->async.private_data,
127                                                    struct wreplsrv_out_connect_state);
128         wreplsrv_out_connect_handler(state);
129         return;
130 }
131
132 static void wreplsrv_out_connect_handler_req(struct wrepl_request *req)
133 {
134         struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private_data,
135                                                    struct wreplsrv_out_connect_state);
136         wreplsrv_out_connect_handler(state);
137         return;
138 }
139
140 static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner,
141                                                            enum winsrepl_partner_type type,
142                                                            struct wreplsrv_out_connection *wreplconn)
143 {
144         struct composite_context *c = NULL;
145         struct wreplsrv_service *service = partner->service;
146         struct wreplsrv_out_connect_state *state = NULL;
147         struct wreplsrv_out_connection **wreplconnp = &wreplconn;
148         bool cached_connection = false;
149
150         c = talloc_zero(partner, struct composite_context);
151         if (!c) goto failed;
152
153         state = talloc_zero(c, struct wreplsrv_out_connect_state);
154         if (!state) goto failed;
155         state->c        = c;
156         state->type     = type;
157
158         c->state        = COMPOSITE_STATE_IN_PROGRESS;
159         c->event_ctx    = service->task->event_ctx;
160         c->private_data = state;
161
162         if (type == WINSREPL_PARTNER_PUSH) {
163                 cached_connection       = true;
164                 wreplconn               = partner->push.wreplconn;
165                 wreplconnp              = &partner->push.wreplconn;
166         } else if (type == WINSREPL_PARTNER_PULL) {
167                 cached_connection       = true;
168                 wreplconn               = partner->pull.wreplconn;
169                 wreplconnp              = &partner->pull.wreplconn;
170         }
171
172         /* we have a connection already, so use it */
173         if (wreplconn) {
174                 if (!wreplconn->sock->dead) {
175                         state->stage    = WREPLSRV_OUT_CONNECT_STAGE_DONE;
176                         state->wreplconn= wreplconn;
177                         composite_done(c);
178                         return c;
179                 } else if (!cached_connection) {
180                         state->stage    = WREPLSRV_OUT_CONNECT_STAGE_DONE;
181                         state->wreplconn= NULL;
182                         composite_done(c);
183                         return c;
184                 } else {
185                         talloc_free(wreplconn);
186                         *wreplconnp = NULL;
187                 }
188         }
189
190         wreplconn = talloc_zero(state, struct wreplsrv_out_connection);
191         if (!wreplconn) goto failed;
192
193         wreplconn->service      = service;
194         wreplconn->partner      = partner;
195         wreplconn->sock         = wrepl_socket_init(wreplconn, service->task->event_ctx, lp_iconv_convenience(service->task->lp_ctx));
196         if (!wreplconn->sock) goto failed;
197
198         state->stage    = WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET;
199         state->wreplconn= wreplconn;
200         state->c_req    = wrepl_connect_send(wreplconn->sock,
201                                              partner->our_address?partner->our_address:wrepl_best_ip(service->task->lp_ctx, partner->address),
202                                              partner->address);
203         if (!state->c_req) goto failed;
204
205         state->c_req->async.fn                  = wreplsrv_out_connect_handler_creq;
206         state->c_req->async.private_data        = state;
207
208         return c;
209 failed:
210         talloc_free(c);
211         return NULL;
212 }
213
214 static NTSTATUS wreplsrv_out_connect_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
215                                           struct wreplsrv_out_connection **wreplconn)
216 {
217         NTSTATUS status;
218
219         status = composite_wait(c);
220
221         if (NT_STATUS_IS_OK(status)) {
222                 struct wreplsrv_out_connect_state *state = talloc_get_type(c->private_data,
223                                                            struct wreplsrv_out_connect_state);
224                 if (state->wreplconn) {
225                         *wreplconn = talloc_reference(mem_ctx, state->wreplconn);
226                         if (!*wreplconn) status = NT_STATUS_NO_MEMORY;
227                 } else {
228                         status = NT_STATUS_INVALID_CONNECTION;
229                 }
230         }
231
232         talloc_free(c);
233         return status;
234         
235 }
236
237 struct wreplsrv_pull_table_io {
238         struct {
239                 struct wreplsrv_partner *partner;
240                 uint32_t num_owners;
241                 struct wrepl_wins_owner *owners;
242         } in;
243         struct {
244                 uint32_t num_owners;
245                 struct wrepl_wins_owner *owners;
246         } out;
247 };
248
249 enum wreplsrv_pull_table_stage {
250         WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION,
251         WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY,
252         WREPLSRV_PULL_TABLE_STAGE_DONE
253 };
254
255 struct wreplsrv_pull_table_state {
256         enum wreplsrv_pull_table_stage stage;
257         struct composite_context *c;
258         struct wrepl_pull_table table_io;
259         struct wreplsrv_pull_table_io *io;
260         struct composite_context *creq;
261         struct wreplsrv_out_connection *wreplconn;
262         struct tevent_req *subreq;
263 };
264
265 static void wreplsrv_pull_table_handler_treq(struct tevent_req *subreq);
266
267 static NTSTATUS wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state *state)
268 {
269         NTSTATUS status;
270
271         status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
272         NT_STATUS_NOT_OK_RETURN(status);
273
274         state->table_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
275         state->subreq = wrepl_pull_table_send(state,
276                                               state->wreplconn->service->task->event_ctx,
277                                               state->wreplconn->sock, &state->table_io);
278         NT_STATUS_HAVE_NO_MEMORY(state->subreq);
279
280         tevent_req_set_callback(state->subreq,
281                                 wreplsrv_pull_table_handler_treq,
282                                 state);
283
284         state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY;
285
286         return NT_STATUS_OK;
287 }
288
289 static NTSTATUS wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state *state)
290 {
291         NTSTATUS status;
292
293         status = wrepl_pull_table_recv(state->subreq, state, &state->table_io);
294         TALLOC_FREE(state->subreq);
295         NT_STATUS_NOT_OK_RETURN(status);
296
297         state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
298
299         return NT_STATUS_OK;
300 }
301
302 static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state *state)
303 {
304         struct composite_context *c = state->c;
305
306         switch (state->stage) {
307         case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION:
308                 c->status = wreplsrv_pull_table_wait_connection(state);
309                 break;
310         case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY:
311                 c->status = wreplsrv_pull_table_wait_table_reply(state);
312                 c->state  = COMPOSITE_STATE_DONE;
313                 break;
314         case WREPLSRV_PULL_TABLE_STAGE_DONE:
315                 c->status = NT_STATUS_INTERNAL_ERROR;
316         }
317
318         if (!NT_STATUS_IS_OK(c->status)) {
319                 c->state = COMPOSITE_STATE_ERROR;
320         }
321
322         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
323                 c->async.fn(c);
324         }
325 }
326
327 static void wreplsrv_pull_table_handler_creq(struct composite_context *creq)
328 {
329         struct wreplsrv_pull_table_state *state = talloc_get_type(creq->async.private_data,
330                                                   struct wreplsrv_pull_table_state);
331         wreplsrv_pull_table_handler(state);
332         return;
333 }
334
335 static void wreplsrv_pull_table_handler_treq(struct tevent_req *subreq)
336 {
337         struct wreplsrv_pull_table_state *state = tevent_req_callback_data(subreq,
338                                                   struct wreplsrv_pull_table_state);
339         wreplsrv_pull_table_handler(state);
340         return;
341 }
342
343 static struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io)
344 {
345         struct composite_context *c = NULL;
346         struct wreplsrv_service *service = io->in.partner->service;
347         struct wreplsrv_pull_table_state *state = NULL;
348
349         c = talloc_zero(mem_ctx, struct composite_context);
350         if (!c) goto failed;
351
352         state = talloc_zero(c, struct wreplsrv_pull_table_state);
353         if (!state) goto failed;
354         state->c        = c;
355         state->io       = io;
356
357         c->state        = COMPOSITE_STATE_IN_PROGRESS;
358         c->event_ctx    = service->task->event_ctx;
359         c->private_data = state;
360
361         if (io->in.num_owners) {
362                 state->table_io.out.num_partners        = io->in.num_owners;
363                 state->table_io.out.partners            = io->in.owners;
364                 state->stage                            = WREPLSRV_PULL_TABLE_STAGE_DONE;
365                 composite_done(c);
366                 return c;
367         }
368
369         state->stage    = WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION;
370         state->creq     = wreplsrv_out_connect_send(io->in.partner, WINSREPL_PARTNER_PULL, NULL);
371         if (!state->creq) goto failed;
372
373         state->creq->async.fn           = wreplsrv_pull_table_handler_creq;
374         state->creq->async.private_data = state;
375
376         return c;
377 failed:
378         talloc_free(c);
379         return NULL;
380 }
381
382 static NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
383                                          struct wreplsrv_pull_table_io *io)
384 {
385         NTSTATUS status;
386
387         status = composite_wait(c);
388
389         if (NT_STATUS_IS_OK(status)) {
390                 struct wreplsrv_pull_table_state *state = talloc_get_type(c->private_data,
391                                                           struct wreplsrv_pull_table_state);
392                 io->out.num_owners      = state->table_io.out.num_partners;
393                 io->out.owners          = talloc_reference(mem_ctx, state->table_io.out.partners);
394         }
395
396         talloc_free(c);
397         return status;  
398 }
399
400 struct wreplsrv_pull_names_io {
401         struct {
402                 struct wreplsrv_partner *partner;
403                 struct wreplsrv_out_connection *wreplconn;
404                 struct wrepl_wins_owner owner;
405         } in;
406         struct {
407                 uint32_t num_names;
408                 struct wrepl_name *names;
409         } out;
410 };
411
412 enum wreplsrv_pull_names_stage {
413         WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION,
414         WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY,
415         WREPLSRV_PULL_NAMES_STAGE_DONE
416 };
417
418 struct wreplsrv_pull_names_state {
419         enum wreplsrv_pull_names_stage stage;
420         struct composite_context *c;
421         struct wrepl_pull_names pull_io;
422         struct wreplsrv_pull_names_io *io;
423         struct composite_context *creq;
424         struct wreplsrv_out_connection *wreplconn;
425         struct tevent_req *subreq;
426 };
427
428 static void wreplsrv_pull_names_handler_treq(struct tevent_req *subreq);
429
430 static NTSTATUS wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state *state)
431 {
432         NTSTATUS status;
433
434         status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
435         NT_STATUS_NOT_OK_RETURN(status);
436
437         state->pull_io.in.assoc_ctx     = state->wreplconn->assoc_ctx.peer_ctx;
438         state->pull_io.in.partner       = state->io->in.owner;
439         state->subreq = wrepl_pull_names_send(state,
440                                               state->wreplconn->service->task->event_ctx,
441                                               state->wreplconn->sock,
442                                               &state->pull_io);
443         NT_STATUS_HAVE_NO_MEMORY(state->subreq);
444
445         tevent_req_set_callback(state->subreq,
446                                 wreplsrv_pull_names_handler_treq,
447                                 state);
448
449         state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY;
450
451         return NT_STATUS_OK;
452 }
453
454 static NTSTATUS wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state *state)
455 {
456         NTSTATUS status;
457
458         status = wrepl_pull_names_recv(state->subreq, state, &state->pull_io);
459         TALLOC_FREE(state->subreq);
460         NT_STATUS_NOT_OK_RETURN(status);
461
462         state->stage = WREPLSRV_PULL_NAMES_STAGE_DONE;
463
464         return NT_STATUS_OK;
465 }
466
467 static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state *state)
468 {
469         struct composite_context *c = state->c;
470
471         switch (state->stage) {
472         case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION:
473                 c->status = wreplsrv_pull_names_wait_connection(state);
474                 break;
475         case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY:
476                 c->status = wreplsrv_pull_names_wait_send_reply(state);
477                 c->state  = COMPOSITE_STATE_DONE;
478                 break;
479         case WREPLSRV_PULL_NAMES_STAGE_DONE:
480                 c->status = NT_STATUS_INTERNAL_ERROR;
481         }
482
483         if (!NT_STATUS_IS_OK(c->status)) {
484                 c->state = COMPOSITE_STATE_ERROR;
485         }
486
487         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
488                 c->async.fn(c);
489         }
490 }
491
492 static void wreplsrv_pull_names_handler_creq(struct composite_context *creq)
493 {
494         struct wreplsrv_pull_names_state *state = talloc_get_type(creq->async.private_data,
495                                                   struct wreplsrv_pull_names_state);
496         wreplsrv_pull_names_handler(state);
497         return;
498 }
499
500 static void wreplsrv_pull_names_handler_treq(struct tevent_req *subreq)
501 {
502         struct wreplsrv_pull_names_state *state = tevent_req_callback_data(subreq,
503                                                   struct wreplsrv_pull_names_state);
504         wreplsrv_pull_names_handler(state);
505         return;
506 }
507
508 static struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io)
509 {
510         struct composite_context *c = NULL;
511         struct wreplsrv_service *service = io->in.partner->service;
512         struct wreplsrv_pull_names_state *state = NULL;
513         enum winsrepl_partner_type partner_type = WINSREPL_PARTNER_PULL;
514
515         if (io->in.wreplconn) partner_type = WINSREPL_PARTNER_NONE;
516
517         c = talloc_zero(mem_ctx, struct composite_context);
518         if (!c) goto failed;
519
520         state = talloc_zero(c, struct wreplsrv_pull_names_state);
521         if (!state) goto failed;
522         state->c        = c;
523         state->io       = io;
524
525         c->state        = COMPOSITE_STATE_IN_PROGRESS;
526         c->event_ctx    = service->task->event_ctx;
527         c->private_data = state;
528
529         state->stage    = WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION;
530         state->creq     = wreplsrv_out_connect_send(io->in.partner, partner_type, io->in.wreplconn);
531         if (!state->creq) goto failed;
532
533         state->creq->async.fn           = wreplsrv_pull_names_handler_creq;
534         state->creq->async.private_data = state;
535
536         return c;
537 failed:
538         talloc_free(c);
539         return NULL;
540 }
541
542 static NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
543                                          struct wreplsrv_pull_names_io *io)
544 {
545         NTSTATUS status;
546
547         status = composite_wait(c);
548
549         if (NT_STATUS_IS_OK(status)) {
550                 struct wreplsrv_pull_names_state *state = talloc_get_type(c->private_data,
551                                                           struct wreplsrv_pull_names_state);
552                 io->out.num_names       = state->pull_io.out.num_names;
553                 io->out.names           = talloc_reference(mem_ctx, state->pull_io.out.names);
554         }
555
556         talloc_free(c);
557         return status;
558         
559 }
560
561 enum wreplsrv_pull_cycle_stage {
562         WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY,
563         WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES,
564         WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC,
565         WREPLSRV_PULL_CYCLE_STAGE_DONE
566 };
567
568 struct wreplsrv_pull_cycle_state {
569         enum wreplsrv_pull_cycle_stage stage;
570         struct composite_context *c;
571         struct wreplsrv_pull_cycle_io *io;
572         struct wreplsrv_pull_table_io table_io;
573         uint32_t current;
574         struct wreplsrv_pull_names_io names_io;
575         struct composite_context *creq;
576         struct wrepl_associate_stop assoc_stop_io;
577         struct wrepl_request *req;
578 };
579
580 static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq);
581 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req);
582
583 static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state *state)
584 {
585         struct wreplsrv_owner *current_owner=NULL;
586         struct wreplsrv_owner *local_owner;
587         uint32_t i;
588         uint64_t old_max_version = 0;
589         bool do_pull = false;
590
591         for (i=state->current; i < state->table_io.out.num_owners; i++) {
592                 current_owner = wreplsrv_find_owner(state->io->in.partner->service,
593                                                     state->io->in.partner->pull.table,
594                                                     state->table_io.out.owners[i].address);
595
596                 local_owner = wreplsrv_find_owner(state->io->in.partner->service,
597                                                   state->io->in.partner->service->table,
598                                                   state->table_io.out.owners[i].address);
599                 /*
600                  * this means we are ourself the current owner,
601                  * and we don't want replicate ourself
602                  */
603                 if (!current_owner) continue;
604
605                 /*
606                  * this means we don't have any records of this owner
607                  * so fetch them
608                  */
609                 if (!local_owner) {
610                         do_pull         = true;
611                         
612                         break;
613                 }
614
615                 /*
616                  * this means the remote partner has some new records of this owner
617                  * fetch them
618                  */
619                 if (current_owner->owner.max_version > local_owner->owner.max_version) {
620                         do_pull         = true;
621                         old_max_version = local_owner->owner.max_version;
622                         break;
623                 }
624         }
625         state->current = i;
626
627         if (do_pull) {
628                 state->names_io.in.partner              = state->io->in.partner;
629                 state->names_io.in.wreplconn            = state->io->in.wreplconn;
630                 state->names_io.in.owner                = current_owner->owner;
631                 state->names_io.in.owner.min_version    = old_max_version + 1;
632                 state->creq = wreplsrv_pull_names_send(state, &state->names_io);
633                 NT_STATUS_HAVE_NO_MEMORY(state->creq);
634
635                 state->creq->async.fn           = wreplsrv_pull_cycle_handler_creq;
636                 state->creq->async.private_data = state;
637
638                 return STATUS_MORE_ENTRIES;
639         }
640
641         return NT_STATUS_OK;
642 }
643
644 static NTSTATUS wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state *state)
645 {
646         NTSTATUS status;
647
648         status = wreplsrv_pull_cycle_next_owner_do_work(state);
649         if (NT_STATUS_IS_OK(status)) {
650                 state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
651         } else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES, status)) {
652                 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES;
653                 status = NT_STATUS_OK;
654         }
655
656         if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE && state->io->in.wreplconn) {
657                 state->assoc_stop_io.in.assoc_ctx       = state->io->in.wreplconn->assoc_ctx.peer_ctx;
658                 state->assoc_stop_io.in.reason          = 0;
659                 state->req = wrepl_associate_stop_send(state->io->in.wreplconn->sock, &state->assoc_stop_io);
660                 NT_STATUS_HAVE_NO_MEMORY(state->req);
661
662                 state->req->async.fn            = wreplsrv_pull_cycle_handler_req;
663                 state->req->async.private_data  = state;
664
665                 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC;
666         }
667
668         return status;
669 }
670
671 static NTSTATUS wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state *state)
672 {
673         NTSTATUS status;
674         uint32_t i;
675
676         status = wreplsrv_pull_table_recv(state->creq, state, &state->table_io);
677         NT_STATUS_NOT_OK_RETURN(status);
678
679         /* update partner table */
680         for (i=0; i < state->table_io.out.num_owners; i++) {
681                 status = wreplsrv_add_table(state->io->in.partner->service,
682                                             state->io->in.partner, 
683                                             &state->io->in.partner->pull.table,
684                                             state->table_io.out.owners[i].address,
685                                             state->table_io.out.owners[i].max_version);
686                 NT_STATUS_NOT_OK_RETURN(status);
687         }
688
689         status = wreplsrv_pull_cycle_next_owner_wrapper(state);
690         NT_STATUS_NOT_OK_RETURN(status);
691
692         return status;
693 }
694
695 static NTSTATUS wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state *state)
696 {
697         NTSTATUS status;
698
699         status = wreplsrv_apply_records(state->io->in.partner,
700                                         &state->names_io.in.owner,
701                                         state->names_io.out.num_names,
702                                         state->names_io.out.names);
703         NT_STATUS_NOT_OK_RETURN(status);
704
705         talloc_free(state->names_io.out.names);
706         ZERO_STRUCT(state->names_io);
707
708         return NT_STATUS_OK;
709 }
710
711 static NTSTATUS wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state *state)
712 {
713         NTSTATUS status;
714
715         status = wreplsrv_pull_names_recv(state->creq, state, &state->names_io);
716         NT_STATUS_NOT_OK_RETURN(status);
717
718         /*
719          * TODO: this should maybe an async call,
720          *       because we may need some network access
721          *       for conflict resolving
722          */
723         status = wreplsrv_pull_cycle_apply_records(state);
724         NT_STATUS_NOT_OK_RETURN(status);
725
726         status = wreplsrv_pull_cycle_next_owner_wrapper(state);
727         NT_STATUS_NOT_OK_RETURN(status);
728
729         return status;
730 }
731
732 static NTSTATUS wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state *state)
733 {
734         NTSTATUS status;
735
736         status = wrepl_associate_stop_recv(state->req, &state->assoc_stop_io);
737         NT_STATUS_NOT_OK_RETURN(status);
738
739         state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
740
741         return status;
742 }
743
744 static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state *state)
745 {
746         struct composite_context *c = state->c;
747
748         switch (state->stage) {
749         case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY:
750                 c->status = wreplsrv_pull_cycle_wait_table_reply(state);
751                 break;
752         case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES:
753                 c->status = wreplsrv_pull_cycle_wait_send_replies(state);
754                 break;
755         case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC:
756                 c->status = wreplsrv_pull_cycle_wait_stop_assoc(state);
757                 break;
758         case WREPLSRV_PULL_CYCLE_STAGE_DONE:
759                 c->status = NT_STATUS_INTERNAL_ERROR;
760         }
761
762         if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE) {
763                 c->state  = COMPOSITE_STATE_DONE;
764         }
765
766         if (!NT_STATUS_IS_OK(c->status)) {
767                 c->state = COMPOSITE_STATE_ERROR;
768         }
769
770         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
771                 c->async.fn(c);
772         }
773 }
774
775 static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq)
776 {
777         struct wreplsrv_pull_cycle_state *state = talloc_get_type(creq->async.private_data,
778                                                   struct wreplsrv_pull_cycle_state);
779         wreplsrv_pull_cycle_handler(state);
780         return;
781 }
782
783 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req)
784 {
785         struct wreplsrv_pull_cycle_state *state = talloc_get_type(req->async.private_data,
786                                                   struct wreplsrv_pull_cycle_state);
787         wreplsrv_pull_cycle_handler(state);
788         return;
789 }
790
791 struct composite_context *wreplsrv_pull_cycle_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_cycle_io *io)
792 {
793         struct composite_context *c = NULL;
794         struct wreplsrv_service *service = io->in.partner->service;
795         struct wreplsrv_pull_cycle_state *state = NULL;
796
797         c = talloc_zero(mem_ctx, struct composite_context);
798         if (!c) goto failed;
799
800         state = talloc_zero(c, struct wreplsrv_pull_cycle_state);
801         if (!state) goto failed;
802         state->c        = c;
803         state->io       = io;
804
805         c->state        = COMPOSITE_STATE_IN_PROGRESS;
806         c->event_ctx    = service->task->event_ctx;
807         c->private_data = state;
808
809         state->stage    = WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY;
810         state->table_io.in.partner      = io->in.partner;
811         state->table_io.in.num_owners   = io->in.num_owners;
812         state->table_io.in.owners       = io->in.owners;
813         state->creq = wreplsrv_pull_table_send(state, &state->table_io);
814         if (!state->creq) goto failed;
815
816         state->creq->async.fn           = wreplsrv_pull_cycle_handler_creq;
817         state->creq->async.private_data = state;
818
819         return c;
820 failed:
821         talloc_free(c);
822         return NULL;
823 }
824
825 NTSTATUS wreplsrv_pull_cycle_recv(struct composite_context *c)
826 {
827         NTSTATUS status;
828
829         status = composite_wait(c);
830
831         talloc_free(c);
832         return status;
833 }
834
835 enum wreplsrv_push_notify_stage {
836         WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT,
837         WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM,
838         WREPLSRV_PUSH_NOTIFY_STAGE_DONE
839 };
840
841 struct wreplsrv_push_notify_state {
842         enum wreplsrv_push_notify_stage stage;
843         struct composite_context *c;
844         struct wreplsrv_push_notify_io *io;
845         enum wrepl_replication_cmd command;
846         bool full_table;
847         struct wrepl_send_ctrl ctrl;
848         struct wrepl_request *req;
849         struct wrepl_packet req_packet;
850         struct wrepl_packet *rep_packet;
851         struct composite_context *creq;
852         struct wreplsrv_out_connection *wreplconn;
853 };
854
855 static void wreplsrv_push_notify_handler_creq(struct composite_context *creq);
856 static void wreplsrv_push_notify_handler_req(struct wrepl_request *req);
857
858 static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *state)
859 {
860         struct wreplsrv_service *service = state->io->in.partner->service;
861         struct wrepl_packet *req = &state->req_packet;
862         struct wrepl_replication *repl_out = &state->req_packet.message.replication;
863         struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
864         struct wreplsrv_in_connection *wrepl_in;
865         NTSTATUS status;
866         struct socket_context *sock;
867
868         /* prepare the outgoing request */
869         req->opcode     = WREPL_OPCODE_BITS;
870         req->assoc_ctx  = state->wreplconn->assoc_ctx.peer_ctx;
871         req->mess_type  = WREPL_REPLICATION;
872
873         repl_out->command = state->command;
874
875         status = wreplsrv_fill_wrepl_table(service, state, table_out,
876                                            service->wins_db->local_owner, state->full_table);
877         NT_STATUS_NOT_OK_RETURN(status);
878
879         /* queue the request */
880         state->req = wrepl_request_send(state->wreplconn->sock, req, NULL);
881         NT_STATUS_HAVE_NO_MEMORY(state->req);
882
883         /*
884          * now we need to convert the wrepl_socket (client connection)
885          * into a wreplsrv_in_connection (server connection), because
886          * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
887          * message is received by the peer.
888          */
889
890         /* steal the socket_context */
891         sock = state->wreplconn->sock->sock;
892         state->wreplconn->sock->sock = NULL;
893         talloc_steal(state, sock);
894
895         /*
896          * TODO: steal the tstream if we switch the client to tsocket.
897          * This is just to get a compiler error as soon as we remove
898          * packet_context.
899          */
900         state->wreplconn->sock->packet = NULL;
901
902         /*
903          * free the wrepl_socket (client connection)
904          */
905         talloc_free(state->wreplconn->sock);
906         state->wreplconn->sock = NULL;
907
908         /*
909          * now create a wreplsrv_in_connection,
910          * on which we act as server
911          *
912          * NOTE: sock and packet will be stolen by
913          *       wreplsrv_in_connection_merge()
914          */
915         status = wreplsrv_in_connection_merge(state->io->in.partner,
916                                               sock, &wrepl_in);
917         NT_STATUS_NOT_OK_RETURN(status);
918
919         wrepl_in->assoc_ctx.peer_ctx    = state->wreplconn->assoc_ctx.peer_ctx;
920         wrepl_in->assoc_ctx.our_ctx     = 0;
921
922         /* now we can free the wreplsrv_out_connection */
923         TALLOC_FREE(state->wreplconn);
924
925         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
926
927         return NT_STATUS_OK;
928 }
929
930 static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *state)
931 {
932         struct wreplsrv_service *service = state->io->in.partner->service;
933         struct wrepl_packet *req = &state->req_packet;
934         struct wrepl_replication *repl_out = &state->req_packet.message.replication;
935         struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
936         NTSTATUS status;
937
938         req->opcode     = WREPL_OPCODE_BITS;
939         req->assoc_ctx  = state->wreplconn->assoc_ctx.peer_ctx;
940         req->mess_type  = WREPL_REPLICATION;
941
942         repl_out->command = state->command;
943
944         status = wreplsrv_fill_wrepl_table(service, state, table_out,
945                                            service->wins_db->local_owner, state->full_table);
946         NT_STATUS_NOT_OK_RETURN(status);
947
948         /* we won't get a reply to a inform message */
949         state->ctrl.send_only           = true;
950
951         state->req = wrepl_request_send(state->wreplconn->sock, req, &state->ctrl);
952         NT_STATUS_HAVE_NO_MEMORY(state->req);
953
954         state->req->async.fn            = wreplsrv_push_notify_handler_req;
955         state->req->async.private_data  = state;
956
957         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM;
958
959         return NT_STATUS_OK;
960 }
961
962 static NTSTATUS wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state *state)
963 {
964         NTSTATUS status;
965
966         status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
967         NT_STATUS_NOT_OK_RETURN(status);
968
969         /* is the peer doesn't support inform fallback to update */
970         switch (state->command) {
971         case WREPL_REPL_INFORM:
972                 if (state->wreplconn->assoc_ctx.peer_major < 5) {
973                         state->command = WREPL_REPL_UPDATE;
974                 }
975                 break;
976         case WREPL_REPL_INFORM2:
977                 if (state->wreplconn->assoc_ctx.peer_major < 5) {
978                         state->command = WREPL_REPL_UPDATE2;
979                 }
980                 break;
981         default:
982                 break;
983         }
984
985         switch (state->command) {
986         case WREPL_REPL_UPDATE:
987                 state->full_table = true;
988                 return wreplsrv_push_notify_update(state);
989         case WREPL_REPL_UPDATE2:
990                 state->full_table = false;
991                 return wreplsrv_push_notify_update(state);
992         case WREPL_REPL_INFORM:
993                 state->full_table = true;
994                 return wreplsrv_push_notify_inform(state);
995         case WREPL_REPL_INFORM2:
996                 state->full_table = false;
997                 return wreplsrv_push_notify_inform(state);
998         default:
999                 return NT_STATUS_INTERNAL_ERROR;
1000         }
1001
1002         return NT_STATUS_INTERNAL_ERROR;
1003 }
1004
1005 static NTSTATUS wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state *state)
1006 {
1007         NTSTATUS status;
1008
1009         status =  wrepl_request_recv(state->req, state, NULL);
1010         NT_STATUS_NOT_OK_RETURN(status);
1011
1012         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
1013         return status;
1014 }
1015
1016 static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state *state)
1017 {
1018         struct composite_context *c = state->c;
1019
1020         switch (state->stage) {
1021         case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT:
1022                 c->status = wreplsrv_push_notify_wait_connect(state);
1023                 break;
1024         case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM:
1025                 c->status = wreplsrv_push_notify_wait_inform(state);
1026                 break;
1027         case WREPLSRV_PUSH_NOTIFY_STAGE_DONE:
1028                 c->status = NT_STATUS_INTERNAL_ERROR;
1029         }
1030
1031         if (state->stage == WREPLSRV_PUSH_NOTIFY_STAGE_DONE) {
1032                 c->state  = COMPOSITE_STATE_DONE;
1033         }
1034
1035         if (!NT_STATUS_IS_OK(c->status)) {
1036                 c->state = COMPOSITE_STATE_ERROR;
1037         }
1038
1039         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
1040                 c->async.fn(c);
1041         }
1042 }
1043
1044 static void wreplsrv_push_notify_handler_creq(struct composite_context *creq)
1045 {
1046         struct wreplsrv_push_notify_state *state = talloc_get_type(creq->async.private_data,
1047                                                    struct wreplsrv_push_notify_state);
1048         wreplsrv_push_notify_handler(state);
1049         return;
1050 }
1051
1052 static void wreplsrv_push_notify_handler_req(struct wrepl_request *req)
1053 {
1054         struct wreplsrv_push_notify_state *state = talloc_get_type(req->async.private_data,
1055                                                    struct wreplsrv_push_notify_state);
1056         wreplsrv_push_notify_handler(state);
1057         return;
1058 }
1059
1060 struct composite_context *wreplsrv_push_notify_send(TALLOC_CTX *mem_ctx, struct wreplsrv_push_notify_io *io)
1061 {
1062         struct composite_context *c = NULL;
1063         struct wreplsrv_service *service = io->in.partner->service;
1064         struct wreplsrv_push_notify_state *state = NULL;
1065         enum winsrepl_partner_type partner_type;
1066
1067         c = talloc_zero(mem_ctx, struct composite_context);
1068         if (!c) goto failed;
1069
1070         state = talloc_zero(c, struct wreplsrv_push_notify_state);
1071         if (!state) goto failed;
1072         state->c        = c;
1073         state->io       = io;
1074
1075         if (io->in.inform) {
1076                 /* we can cache the connection in partner->push->wreplconn */
1077                 partner_type = WINSREPL_PARTNER_PUSH;
1078                 if (io->in.propagate) {
1079                         state->command  = WREPL_REPL_INFORM2;
1080                 } else {
1081                         state->command  = WREPL_REPL_INFORM;
1082                 }
1083         } else {
1084                 /* we can NOT cache the connection */
1085                 partner_type = WINSREPL_PARTNER_NONE;
1086                 if (io->in.propagate) {
1087                         state->command  = WREPL_REPL_UPDATE2;
1088                 } else {
1089                         state->command  = WREPL_REPL_UPDATE;
1090                 }       
1091         }
1092
1093         c->state        = COMPOSITE_STATE_IN_PROGRESS;
1094         c->event_ctx    = service->task->event_ctx;
1095         c->private_data = state;
1096
1097         state->stage    = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT;
1098         state->creq     = wreplsrv_out_connect_send(io->in.partner, partner_type, NULL);
1099         if (!state->creq) goto failed;
1100
1101         state->creq->async.fn           = wreplsrv_push_notify_handler_creq;
1102         state->creq->async.private_data = state;
1103
1104         return c;
1105 failed:
1106         talloc_free(c);
1107         return NULL;
1108 }
1109
1110 NTSTATUS wreplsrv_push_notify_recv(struct composite_context *c)
1111 {
1112         NTSTATUS status;
1113
1114         status = composite_wait(c);
1115
1116         talloc_free(c);
1117         return status;
1118 }