r23792: convert Samba4 to GPLv3
[jelmer/samba4-debian.git] / source / wrepl_server / wrepl_out_helpers.c
1 /* 
2    Unix SMB/CIFS implementation.
3    
4    WINS Replication server
5    
6    Copyright (C) Stefan Metzmacher      2005
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 3 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 */
21
22 #include "includes.h"
23 #include "lib/events/events.h"
24 #include "lib/socket/socket.h"
25 #include "smbd/service_task.h"
26 #include "smbd/service_stream.h"
27 #include "librpc/gen_ndr/winsrepl.h"
28 #include "wrepl_server/wrepl_server.h"
29 #include "nbt_server/wins/winsdb.h"
30 #include "libcli/composite/composite.h"
31 #include "libcli/wrepl/winsrepl.h"
32
33 enum wreplsrv_out_connect_stage {
34         WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET,
35         WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX,
36         WREPLSRV_OUT_CONNECT_STAGE_DONE
37 };
38
39 struct wreplsrv_out_connect_state {
40         enum wreplsrv_out_connect_stage stage;
41         struct composite_context *c;
42         struct wrepl_request *req;
43         struct composite_context *c_req;
44         struct wrepl_associate assoc_io;
45         enum winsrepl_partner_type type;
46         struct wreplsrv_out_connection *wreplconn;
47 };
48
49 static void wreplsrv_out_connect_handler_creq(struct composite_context *c_req);
50 static void wreplsrv_out_connect_handler_req(struct wrepl_request *req);
51
52 static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state)
53 {
54         NTSTATUS status;
55
56         status = wrepl_connect_recv(state->c_req);
57         NT_STATUS_NOT_OK_RETURN(status);
58
59         state->req = wrepl_associate_send(state->wreplconn->sock, &state->assoc_io);
60         NT_STATUS_HAVE_NO_MEMORY(state->req);
61
62         state->req->async.fn            = wreplsrv_out_connect_handler_req;
63         state->req->async.private       = state;
64
65         state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX;
66
67         return NT_STATUS_OK;
68 }
69
70 static NTSTATUS wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state *state)
71 {
72         NTSTATUS status;
73
74         status = wrepl_associate_recv(state->req, &state->assoc_io);
75         NT_STATUS_NOT_OK_RETURN(status);
76
77         state->wreplconn->assoc_ctx.peer_ctx = state->assoc_io.out.assoc_ctx;
78
79         if (state->type == WINSREPL_PARTNER_PUSH) {
80                 state->wreplconn->partner->push.wreplconn = state->wreplconn;
81                 talloc_steal(state->wreplconn->partner, state->wreplconn);
82         } else if (state->type == WINSREPL_PARTNER_PULL) {
83                 state->wreplconn->partner->pull.wreplconn = state->wreplconn;
84                 talloc_steal(state->wreplconn->partner, state->wreplconn);
85         }
86
87         state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
88
89         return NT_STATUS_OK;
90 }
91
92 static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state *state)
93 {
94         struct composite_context *c = state->c;
95
96         switch (state->stage) {
97         case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET:
98                 c->status = wreplsrv_out_connect_wait_socket(state);
99                 break;
100         case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX:
101                 c->status = wreplsrv_out_connect_wait_assoc_ctx(state);
102                 c->state  = COMPOSITE_STATE_DONE;
103                 break;
104         case WREPLSRV_OUT_CONNECT_STAGE_DONE:
105                 c->status = NT_STATUS_INTERNAL_ERROR;
106         }
107
108         if (!NT_STATUS_IS_OK(c->status)) {
109                 c->state = COMPOSITE_STATE_ERROR;
110         }
111
112         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
113                 c->async.fn(c);
114         }
115 }
116
117 static void wreplsrv_out_connect_handler_creq(struct composite_context *creq)
118 {
119         struct wreplsrv_out_connect_state *state = talloc_get_type(creq->async.private_data,
120                                                    struct wreplsrv_out_connect_state);
121         wreplsrv_out_connect_handler(state);
122         return;
123 }
124
125 static void wreplsrv_out_connect_handler_req(struct wrepl_request *req)
126 {
127         struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private,
128                                                    struct wreplsrv_out_connect_state);
129         wreplsrv_out_connect_handler(state);
130         return;
131 }
132
133 static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner,
134                                                            enum winsrepl_partner_type type,
135                                                            struct wreplsrv_out_connection *wreplconn)
136 {
137         struct composite_context *c = NULL;
138         struct wreplsrv_service *service = partner->service;
139         struct wreplsrv_out_connect_state *state = NULL;
140         struct wreplsrv_out_connection **wreplconnp = &wreplconn;
141         BOOL cached_connection = False;
142
143         c = talloc_zero(partner, struct composite_context);
144         if (!c) goto failed;
145
146         state = talloc_zero(c, struct wreplsrv_out_connect_state);
147         if (!state) goto failed;
148         state->c        = c;
149         state->type     = type;
150
151         c->state        = COMPOSITE_STATE_IN_PROGRESS;
152         c->event_ctx    = service->task->event_ctx;
153         c->private_data = state;
154
155         if (type == WINSREPL_PARTNER_PUSH) {
156                 cached_connection       = True;
157                 wreplconn               = partner->push.wreplconn;
158                 wreplconnp              = &partner->push.wreplconn;
159         } else if (type == WINSREPL_PARTNER_PULL) {
160                 cached_connection       = True;
161                 wreplconn               = partner->pull.wreplconn;
162                 wreplconnp              = &partner->pull.wreplconn;
163         }
164
165         /* we have a connection already, so use it */
166         if (wreplconn) {
167                 if (!wreplconn->sock->dead) {
168                         state->stage    = WREPLSRV_OUT_CONNECT_STAGE_DONE;
169                         state->wreplconn= wreplconn;
170                         composite_done(c);
171                         return c;
172                 } else if (!cached_connection) {
173                         state->stage    = WREPLSRV_OUT_CONNECT_STAGE_DONE;
174                         state->wreplconn= NULL;
175                         composite_done(c);
176                         return c;
177                 } else {
178                         talloc_free(wreplconn);
179                         *wreplconnp = NULL;
180                 }
181         }
182
183         wreplconn = talloc_zero(state, struct wreplsrv_out_connection);
184         if (!wreplconn) goto failed;
185
186         wreplconn->service      = service;
187         wreplconn->partner      = partner;
188         wreplconn->sock         = wrepl_socket_init(wreplconn, service->task->event_ctx);
189         if (!wreplconn->sock) goto failed;
190
191         state->stage    = WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET;
192         state->wreplconn= wreplconn;
193         state->c_req    = wrepl_connect_send(wreplconn->sock,
194                                              partner->our_address,
195                                              partner->address);
196         if (!state->c_req) goto failed;
197
198         state->c_req->async.fn                  = wreplsrv_out_connect_handler_creq;
199         state->c_req->async.private_data        = state;
200
201         return c;
202 failed:
203         talloc_free(c);
204         return NULL;
205 }
206
207 static NTSTATUS wreplsrv_out_connect_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
208                                           struct wreplsrv_out_connection **wreplconn)
209 {
210         NTSTATUS status;
211
212         status = composite_wait(c);
213
214         if (NT_STATUS_IS_OK(status)) {
215                 struct wreplsrv_out_connect_state *state = talloc_get_type(c->private_data,
216                                                            struct wreplsrv_out_connect_state);
217                 if (state->wreplconn) {
218                         *wreplconn = talloc_reference(mem_ctx, state->wreplconn);
219                         if (!*wreplconn) status = NT_STATUS_NO_MEMORY;
220                 } else {
221                         status = NT_STATUS_INVALID_CONNECTION;
222                 }
223         }
224
225         talloc_free(c);
226         return status;
227         
228 }
229
230 struct wreplsrv_pull_table_io {
231         struct {
232                 struct wreplsrv_partner *partner;
233                 uint32_t num_owners;
234                 struct wrepl_wins_owner *owners;
235         } in;
236         struct {
237                 uint32_t num_owners;
238                 struct wrepl_wins_owner *owners;
239         } out;
240 };
241
242 enum wreplsrv_pull_table_stage {
243         WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION,
244         WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY,
245         WREPLSRV_PULL_TABLE_STAGE_DONE
246 };
247
248 struct wreplsrv_pull_table_state {
249         enum wreplsrv_pull_table_stage stage;
250         struct composite_context *c;
251         struct wrepl_request *req;
252         struct wrepl_pull_table table_io;
253         struct wreplsrv_pull_table_io *io;
254         struct composite_context *creq;
255         struct wreplsrv_out_connection *wreplconn;
256 };
257
258 static void wreplsrv_pull_table_handler_req(struct wrepl_request *req);
259
260 static NTSTATUS wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state *state)
261 {
262         NTSTATUS status;
263
264         status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
265         NT_STATUS_NOT_OK_RETURN(status);
266
267         state->table_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
268         state->req = wrepl_pull_table_send(state->wreplconn->sock, &state->table_io);
269         NT_STATUS_HAVE_NO_MEMORY(state->req);
270
271         state->req->async.fn            = wreplsrv_pull_table_handler_req;
272         state->req->async.private       = state;
273
274         state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY;
275
276         return NT_STATUS_OK;
277 }
278
279 static NTSTATUS wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state *state)
280 {
281         NTSTATUS status;
282
283         status = wrepl_pull_table_recv(state->req, state, &state->table_io);
284         NT_STATUS_NOT_OK_RETURN(status);
285
286         state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
287
288         return NT_STATUS_OK;
289 }
290
291 static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state *state)
292 {
293         struct composite_context *c = state->c;
294
295         switch (state->stage) {
296         case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION:
297                 c->status = wreplsrv_pull_table_wait_connection(state);
298                 break;
299         case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY:
300                 c->status = wreplsrv_pull_table_wait_table_reply(state);
301                 c->state  = COMPOSITE_STATE_DONE;
302                 break;
303         case WREPLSRV_PULL_TABLE_STAGE_DONE:
304                 c->status = NT_STATUS_INTERNAL_ERROR;
305         }
306
307         if (!NT_STATUS_IS_OK(c->status)) {
308                 c->state = COMPOSITE_STATE_ERROR;
309         }
310
311         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
312                 c->async.fn(c);
313         }
314 }
315
316 static void wreplsrv_pull_table_handler_creq(struct composite_context *creq)
317 {
318         struct wreplsrv_pull_table_state *state = talloc_get_type(creq->async.private_data,
319                                                   struct wreplsrv_pull_table_state);
320         wreplsrv_pull_table_handler(state);
321         return;
322 }
323
324 static void wreplsrv_pull_table_handler_req(struct wrepl_request *req)
325 {
326         struct wreplsrv_pull_table_state *state = talloc_get_type(req->async.private,
327                                                   struct wreplsrv_pull_table_state);
328         wreplsrv_pull_table_handler(state);
329         return;
330 }
331
332 static struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io)
333 {
334         struct composite_context *c = NULL;
335         struct wreplsrv_service *service = io->in.partner->service;
336         struct wreplsrv_pull_table_state *state = NULL;
337
338         c = talloc_zero(mem_ctx, struct composite_context);
339         if (!c) goto failed;
340
341         state = talloc_zero(c, struct wreplsrv_pull_table_state);
342         if (!state) goto failed;
343         state->c        = c;
344         state->io       = io;
345
346         c->state        = COMPOSITE_STATE_IN_PROGRESS;
347         c->event_ctx    = service->task->event_ctx;
348         c->private_data = state;
349
350         if (io->in.num_owners) {
351                 state->table_io.out.num_partners        = io->in.num_owners;
352                 state->table_io.out.partners            = io->in.owners;
353                 state->stage                            = WREPLSRV_PULL_TABLE_STAGE_DONE;
354                 composite_done(c);
355                 return c;
356         }
357
358         state->stage    = WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION;
359         state->creq     = wreplsrv_out_connect_send(io->in.partner, WINSREPL_PARTNER_PULL, NULL);
360         if (!state->creq) goto failed;
361
362         state->creq->async.fn           = wreplsrv_pull_table_handler_creq;
363         state->creq->async.private_data = state;
364
365         return c;
366 failed:
367         talloc_free(c);
368         return NULL;
369 }
370
371 static NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
372                                          struct wreplsrv_pull_table_io *io)
373 {
374         NTSTATUS status;
375
376         status = composite_wait(c);
377
378         if (NT_STATUS_IS_OK(status)) {
379                 struct wreplsrv_pull_table_state *state = talloc_get_type(c->private_data,
380                                                           struct wreplsrv_pull_table_state);
381                 io->out.num_owners      = state->table_io.out.num_partners;
382                 io->out.owners          = state->table_io.out.partners;
383                 talloc_reference(mem_ctx, state->table_io.out.partners);
384         }
385
386         talloc_free(c);
387         return status;  
388 }
389
390 struct wreplsrv_pull_names_io {
391         struct {
392                 struct wreplsrv_partner *partner;
393                 struct wreplsrv_out_connection *wreplconn;
394                 struct wrepl_wins_owner owner;
395         } in;
396         struct {
397                 uint32_t num_names;
398                 struct wrepl_name *names;
399         } out;
400 };
401
402 enum wreplsrv_pull_names_stage {
403         WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION,
404         WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY,
405         WREPLSRV_PULL_NAMES_STAGE_DONE
406 };
407
408 struct wreplsrv_pull_names_state {
409         enum wreplsrv_pull_names_stage stage;
410         struct composite_context *c;
411         struct wrepl_request *req;
412         struct wrepl_pull_names pull_io;
413         struct wreplsrv_pull_names_io *io;
414         struct composite_context *creq;
415         struct wreplsrv_out_connection *wreplconn;
416 };
417
418 static void wreplsrv_pull_names_handler_req(struct wrepl_request *req);
419
420 static NTSTATUS wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state *state)
421 {
422         NTSTATUS status;
423
424         status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
425         NT_STATUS_NOT_OK_RETURN(status);
426
427         state->pull_io.in.assoc_ctx     = state->wreplconn->assoc_ctx.peer_ctx;
428         state->pull_io.in.partner       = state->io->in.owner;
429         state->req = wrepl_pull_names_send(state->wreplconn->sock, &state->pull_io);
430         NT_STATUS_HAVE_NO_MEMORY(state->req);
431
432         state->req->async.fn            = wreplsrv_pull_names_handler_req;
433         state->req->async.private       = state;
434
435         state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY;
436
437         return NT_STATUS_OK;
438 }
439
440 static NTSTATUS wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state *state)
441 {
442         NTSTATUS status;
443
444         status = wrepl_pull_names_recv(state->req, state, &state->pull_io);
445         NT_STATUS_NOT_OK_RETURN(status);
446
447         state->stage = WREPLSRV_PULL_NAMES_STAGE_DONE;
448
449         return NT_STATUS_OK;
450 }
451
452 static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state *state)
453 {
454         struct composite_context *c = state->c;
455
456         switch (state->stage) {
457         case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION:
458                 c->status = wreplsrv_pull_names_wait_connection(state);
459                 break;
460         case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY:
461                 c->status = wreplsrv_pull_names_wait_send_reply(state);
462                 c->state  = COMPOSITE_STATE_DONE;
463                 break;
464         case WREPLSRV_PULL_NAMES_STAGE_DONE:
465                 c->status = NT_STATUS_INTERNAL_ERROR;
466         }
467
468         if (!NT_STATUS_IS_OK(c->status)) {
469                 c->state = COMPOSITE_STATE_ERROR;
470         }
471
472         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
473                 c->async.fn(c);
474         }
475 }
476
477 static void wreplsrv_pull_names_handler_creq(struct composite_context *creq)
478 {
479         struct wreplsrv_pull_names_state *state = talloc_get_type(creq->async.private_data,
480                                                   struct wreplsrv_pull_names_state);
481         wreplsrv_pull_names_handler(state);
482         return;
483 }
484
485 static void wreplsrv_pull_names_handler_req(struct wrepl_request *req)
486 {
487         struct wreplsrv_pull_names_state *state = talloc_get_type(req->async.private,
488                                                   struct wreplsrv_pull_names_state);
489         wreplsrv_pull_names_handler(state);
490         return;
491 }
492
493 static struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io)
494 {
495         struct composite_context *c = NULL;
496         struct wreplsrv_service *service = io->in.partner->service;
497         struct wreplsrv_pull_names_state *state = NULL;
498         enum winsrepl_partner_type partner_type = WINSREPL_PARTNER_PULL;
499
500         if (io->in.wreplconn) partner_type = WINSREPL_PARTNER_NONE;
501
502         c = talloc_zero(mem_ctx, struct composite_context);
503         if (!c) goto failed;
504
505         state = talloc_zero(c, struct wreplsrv_pull_names_state);
506         if (!state) goto failed;
507         state->c        = c;
508         state->io       = io;
509
510         c->state        = COMPOSITE_STATE_IN_PROGRESS;
511         c->event_ctx    = service->task->event_ctx;
512         c->private_data = state;
513
514         state->stage    = WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION;
515         state->creq     = wreplsrv_out_connect_send(io->in.partner, partner_type, io->in.wreplconn);
516         if (!state->creq) goto failed;
517
518         state->creq->async.fn           = wreplsrv_pull_names_handler_creq;
519         state->creq->async.private_data = state;
520
521         return c;
522 failed:
523         talloc_free(c);
524         return NULL;
525 }
526
527 static NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
528                                          struct wreplsrv_pull_names_io *io)
529 {
530         NTSTATUS status;
531
532         status = composite_wait(c);
533
534         if (NT_STATUS_IS_OK(status)) {
535                 struct wreplsrv_pull_names_state *state = talloc_get_type(c->private_data,
536                                                           struct wreplsrv_pull_names_state);
537                 io->out.num_names       = state->pull_io.out.num_names;
538                 io->out.names           = state->pull_io.out.names;
539                 talloc_reference(mem_ctx, state->pull_io.out.names);
540         }
541
542         talloc_free(c);
543         return status;
544         
545 }
546
547 enum wreplsrv_pull_cycle_stage {
548         WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY,
549         WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES,
550         WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC,
551         WREPLSRV_PULL_CYCLE_STAGE_DONE
552 };
553
554 struct wreplsrv_pull_cycle_state {
555         enum wreplsrv_pull_cycle_stage stage;
556         struct composite_context *c;
557         struct wreplsrv_pull_cycle_io *io;
558         struct wreplsrv_pull_table_io table_io;
559         uint32_t current;
560         struct wreplsrv_pull_names_io names_io;
561         struct composite_context *creq;
562         struct wrepl_associate_stop assoc_stop_io;
563         struct wrepl_request *req;
564 };
565
566 static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq);
567 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req);
568
569 static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state *state)
570 {
571         struct wreplsrv_owner *current_owner=NULL;
572         struct wreplsrv_owner *local_owner;
573         uint32_t i;
574         uint64_t old_max_version = 0;
575         BOOL do_pull = False;
576
577         for (i=state->current; i < state->table_io.out.num_owners; i++) {
578                 current_owner = wreplsrv_find_owner(state->io->in.partner->service,
579                                                     state->io->in.partner->pull.table,
580                                                     state->table_io.out.owners[i].address);
581
582                 local_owner = wreplsrv_find_owner(state->io->in.partner->service,
583                                                   state->io->in.partner->service->table,
584                                                   state->table_io.out.owners[i].address);
585                 /*
586                  * this means we are ourself the current owner,
587                  * and we don't want replicate ourself
588                  */
589                 if (!current_owner) continue;
590
591                 /*
592                  * this means we don't have any records of this owner
593                  * so fetch them
594                  */
595                 if (!local_owner) {
596                         do_pull         = True;
597                         
598                         break;
599                 }
600
601                 /*
602                  * this means the remote partner has some new records of this owner
603                  * fetch them
604                  */
605                 if (current_owner->owner.max_version > local_owner->owner.max_version) {
606                         do_pull         = True;
607                         old_max_version = local_owner->owner.max_version;
608                         break;
609                 }
610         }
611         state->current = i;
612
613         if (do_pull) {
614                 state->names_io.in.partner              = state->io->in.partner;
615                 state->names_io.in.wreplconn            = state->io->in.wreplconn;
616                 state->names_io.in.owner                = current_owner->owner;
617                 state->names_io.in.owner.min_version    = old_max_version + 1;
618                 state->creq = wreplsrv_pull_names_send(state, &state->names_io);
619                 NT_STATUS_HAVE_NO_MEMORY(state->creq);
620
621                 state->creq->async.fn           = wreplsrv_pull_cycle_handler_creq;
622                 state->creq->async.private_data = state;
623
624                 return STATUS_MORE_ENTRIES;
625         }
626
627         return NT_STATUS_OK;
628 }
629
630 static NTSTATUS wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state *state)
631 {
632         NTSTATUS status;
633
634         status = wreplsrv_pull_cycle_next_owner_do_work(state);
635         if (NT_STATUS_IS_OK(status)) {
636                 state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
637         } else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES, status)) {
638                 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES;
639                 status = NT_STATUS_OK;
640         }
641
642         if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE && state->io->in.wreplconn) {
643                 state->assoc_stop_io.in.assoc_ctx       = state->io->in.wreplconn->assoc_ctx.peer_ctx;
644                 state->assoc_stop_io.in.reason          = 0;
645                 state->req = wrepl_associate_stop_send(state->io->in.wreplconn->sock, &state->assoc_stop_io);
646                 NT_STATUS_HAVE_NO_MEMORY(state->req);
647
648                 state->req->async.fn            = wreplsrv_pull_cycle_handler_req;
649                 state->req->async.private       = state;
650
651                 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC;
652         }
653
654         return status;
655 }
656
657 static NTSTATUS wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state *state)
658 {
659         NTSTATUS status;
660         uint32_t i;
661
662         status = wreplsrv_pull_table_recv(state->creq, state, &state->table_io);
663         NT_STATUS_NOT_OK_RETURN(status);
664
665         /* update partner table */
666         for (i=0; i < state->table_io.out.num_owners; i++) {
667                 status = wreplsrv_add_table(state->io->in.partner->service,
668                                             state->io->in.partner, 
669                                             &state->io->in.partner->pull.table,
670                                             state->table_io.out.owners[i].address,
671                                             state->table_io.out.owners[i].max_version);
672                 NT_STATUS_NOT_OK_RETURN(status);
673         }
674
675         status = wreplsrv_pull_cycle_next_owner_wrapper(state);
676         NT_STATUS_NOT_OK_RETURN(status);
677
678         return status;
679 }
680
681 static NTSTATUS wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state *state)
682 {
683         NTSTATUS status;
684
685         status = wreplsrv_apply_records(state->io->in.partner,
686                                         &state->names_io.in.owner,
687                                         state->names_io.out.num_names,
688                                         state->names_io.out.names);
689         NT_STATUS_NOT_OK_RETURN(status);
690
691         talloc_free(state->names_io.out.names);
692         ZERO_STRUCT(state->names_io);
693
694         return NT_STATUS_OK;
695 }
696
697 static NTSTATUS wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state *state)
698 {
699         NTSTATUS status;
700
701         status = wreplsrv_pull_names_recv(state->creq, state, &state->names_io);
702         NT_STATUS_NOT_OK_RETURN(status);
703
704         /*
705          * TODO: this should maybe an async call,
706          *       because we may need some network access
707          *       for conflict resolving
708          */
709         status = wreplsrv_pull_cycle_apply_records(state);
710         NT_STATUS_NOT_OK_RETURN(status);
711
712         status = wreplsrv_pull_cycle_next_owner_wrapper(state);
713         NT_STATUS_NOT_OK_RETURN(status);
714
715         return status;
716 }
717
718 static NTSTATUS wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state *state)
719 {
720         NTSTATUS status;
721
722         status = wrepl_associate_stop_recv(state->req, &state->assoc_stop_io);
723         NT_STATUS_NOT_OK_RETURN(status);
724
725         state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
726
727         return status;
728 }
729
730 static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state *state)
731 {
732         struct composite_context *c = state->c;
733
734         switch (state->stage) {
735         case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY:
736                 c->status = wreplsrv_pull_cycle_wait_table_reply(state);
737                 break;
738         case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES:
739                 c->status = wreplsrv_pull_cycle_wait_send_replies(state);
740                 break;
741         case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC:
742                 c->status = wreplsrv_pull_cycle_wait_stop_assoc(state);
743                 break;
744         case WREPLSRV_PULL_CYCLE_STAGE_DONE:
745                 c->status = NT_STATUS_INTERNAL_ERROR;
746         }
747
748         if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE) {
749                 c->state  = COMPOSITE_STATE_DONE;
750         }
751
752         if (!NT_STATUS_IS_OK(c->status)) {
753                 c->state = COMPOSITE_STATE_ERROR;
754         }
755
756         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
757                 c->async.fn(c);
758         }
759 }
760
761 static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq)
762 {
763         struct wreplsrv_pull_cycle_state *state = talloc_get_type(creq->async.private_data,
764                                                   struct wreplsrv_pull_cycle_state);
765         wreplsrv_pull_cycle_handler(state);
766         return;
767 }
768
769 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req)
770 {
771         struct wreplsrv_pull_cycle_state *state = talloc_get_type(req->async.private,
772                                                   struct wreplsrv_pull_cycle_state);
773         wreplsrv_pull_cycle_handler(state);
774         return;
775 }
776
777 struct composite_context *wreplsrv_pull_cycle_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_cycle_io *io)
778 {
779         struct composite_context *c = NULL;
780         struct wreplsrv_service *service = io->in.partner->service;
781         struct wreplsrv_pull_cycle_state *state = NULL;
782
783         c = talloc_zero(mem_ctx, struct composite_context);
784         if (!c) goto failed;
785
786         state = talloc_zero(c, struct wreplsrv_pull_cycle_state);
787         if (!state) goto failed;
788         state->c        = c;
789         state->io       = io;
790
791         c->state        = COMPOSITE_STATE_IN_PROGRESS;
792         c->event_ctx    = service->task->event_ctx;
793         c->private_data = state;
794
795         state->stage    = WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY;
796         state->table_io.in.partner      = io->in.partner;
797         state->table_io.in.num_owners   = io->in.num_owners;
798         state->table_io.in.owners       = io->in.owners;
799         state->creq = wreplsrv_pull_table_send(state, &state->table_io);
800         if (!state->creq) goto failed;
801
802         state->creq->async.fn           = wreplsrv_pull_cycle_handler_creq;
803         state->creq->async.private_data = state;
804
805         return c;
806 failed:
807         talloc_free(c);
808         return NULL;
809 }
810
811 NTSTATUS wreplsrv_pull_cycle_recv(struct composite_context *c)
812 {
813         NTSTATUS status;
814
815         status = composite_wait(c);
816
817         talloc_free(c);
818         return status;
819 }
820
821 enum wreplsrv_push_notify_stage {
822         WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT,
823         WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM,
824         WREPLSRV_PUSH_NOTIFY_STAGE_DONE
825 };
826
827 struct wreplsrv_push_notify_state {
828         enum wreplsrv_push_notify_stage stage;
829         struct composite_context *c;
830         struct wreplsrv_push_notify_io *io;
831         enum wrepl_replication_cmd command;
832         BOOL full_table;
833         struct wrepl_send_ctrl ctrl;
834         struct wrepl_request *req;
835         struct wrepl_packet req_packet;
836         struct wrepl_packet *rep_packet;
837         struct composite_context *creq;
838         struct wreplsrv_out_connection *wreplconn;
839 };
840
841 static void wreplsrv_push_notify_handler_creq(struct composite_context *creq);
842 static void wreplsrv_push_notify_handler_req(struct wrepl_request *req);
843
844 static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *state)
845 {
846         struct wreplsrv_service *service = state->io->in.partner->service;
847         struct wrepl_packet *req = &state->req_packet;
848         struct wrepl_replication *repl_out = &state->req_packet.message.replication;
849         struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
850         struct wreplsrv_in_connection *wrepl_in;
851         NTSTATUS status;
852         struct socket_context *sock;
853         struct packet_context *packet;
854         uint16_t fde_flags;
855
856         /* prepare the outgoing request */
857         req->opcode     = WREPL_OPCODE_BITS;
858         req->assoc_ctx  = state->wreplconn->assoc_ctx.peer_ctx;
859         req->mess_type  = WREPL_REPLICATION;
860
861         repl_out->command = state->command;
862
863         status = wreplsrv_fill_wrepl_table(service, state, table_out,
864                                            service->wins_db->local_owner, state->full_table);
865         NT_STATUS_NOT_OK_RETURN(status);
866
867         /* queue the request */
868         state->req = wrepl_request_send(state->wreplconn->sock, req, NULL);
869         NT_STATUS_HAVE_NO_MEMORY(state->req);
870
871         /*
872          * now we need to convert the wrepl_socket (client connection)
873          * into a wreplsrv_in_connection (server connection), because
874          * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
875          * message is received by the peer.
876          */
877
878         /* steal the socket_context */
879         sock = state->wreplconn->sock->sock;
880         state->wreplconn->sock->sock = NULL;
881         talloc_steal(state, sock);
882
883         /* 
884          * steal the packet_context
885          * note the request DATA_BLOB we just send on the
886          * wrepl_socket (client connection) is still unter the 
887          * packet context and will be send to the wire
888          */
889         packet = state->wreplconn->sock->packet;
890         state->wreplconn->sock->packet = NULL;
891         talloc_steal(state, packet);
892
893         /*
894          * get the fde_flags of the old fde event,
895          * so that we can later set the same flags to the new one
896          */
897         fde_flags = event_get_fd_flags(state->wreplconn->sock->event.fde);
898
899         /*
900          * free the wrepl_socket (client connection)
901          */
902         talloc_free(state->wreplconn->sock);
903         state->wreplconn->sock = NULL;
904
905         /*
906          * now create a wreplsrv_in_connection,
907          * on which we act as server
908          *
909          * NOTE: sock and packet will be stolen by
910          *       wreplsrv_in_connection_merge()
911          */
912         status = wreplsrv_in_connection_merge(state->io->in.partner,
913                                               sock, packet, &wrepl_in);
914         NT_STATUS_NOT_OK_RETURN(status);
915
916         event_set_fd_flags(wrepl_in->conn->event.fde, fde_flags);
917
918         wrepl_in->assoc_ctx.peer_ctx    = state->wreplconn->assoc_ctx.peer_ctx;
919         wrepl_in->assoc_ctx.our_ctx     = 0;
920
921         /* now we can free the wreplsrv_out_connection */
922         talloc_free(state->wreplconn);
923         state->wreplconn = NULL;
924
925         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
926
927         return NT_STATUS_OK;
928 }
929
930 static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *state)
931 {
932         struct wreplsrv_service *service = state->io->in.partner->service;
933         struct wrepl_packet *req = &state->req_packet;
934         struct wrepl_replication *repl_out = &state->req_packet.message.replication;
935         struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
936         NTSTATUS status;
937
938         req->opcode     = WREPL_OPCODE_BITS;
939         req->assoc_ctx  = state->wreplconn->assoc_ctx.peer_ctx;
940         req->mess_type  = WREPL_REPLICATION;
941
942         repl_out->command = state->command;
943
944         status = wreplsrv_fill_wrepl_table(service, state, table_out,
945                                            service->wins_db->local_owner, state->full_table);
946         NT_STATUS_NOT_OK_RETURN(status);
947
948         /* we won't get a reply to a inform message */
949         state->ctrl.send_only           = True;
950
951         state->req = wrepl_request_send(state->wreplconn->sock, req, &state->ctrl);
952         NT_STATUS_HAVE_NO_MEMORY(state->req);
953
954         state->req->async.fn            = wreplsrv_push_notify_handler_req;
955         state->req->async.private       = state;
956
957         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM;
958
959         return NT_STATUS_OK;
960 }
961
962 static NTSTATUS wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state *state)
963 {
964         NTSTATUS status;
965
966         status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
967         NT_STATUS_NOT_OK_RETURN(status);
968
969         switch (state->command) {
970         case WREPL_REPL_UPDATE:
971                 state->full_table = True;
972                 return wreplsrv_push_notify_update(state);
973         case WREPL_REPL_UPDATE2:
974                 state->full_table = False;
975                 return wreplsrv_push_notify_update(state);
976         case WREPL_REPL_INFORM:
977                 state->full_table = True;
978                 return wreplsrv_push_notify_inform(state);
979         case WREPL_REPL_INFORM2:
980                 state->full_table = False;
981                 return wreplsrv_push_notify_inform(state);
982         default:
983                 return NT_STATUS_INTERNAL_ERROR;
984         }
985
986         return NT_STATUS_INTERNAL_ERROR;
987 }
988
989 static NTSTATUS wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state *state)
990 {
991         NTSTATUS status;
992
993         status =  wrepl_request_recv(state->req, state, NULL);
994         NT_STATUS_NOT_OK_RETURN(status);
995
996         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
997         return status;
998 }
999
1000 static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state *state)
1001 {
1002         struct composite_context *c = state->c;
1003
1004         switch (state->stage) {
1005         case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT:
1006                 c->status = wreplsrv_push_notify_wait_connect(state);
1007                 break;
1008         case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM:
1009                 c->status = wreplsrv_push_notify_wait_inform(state);
1010                 break;
1011         case WREPLSRV_PUSH_NOTIFY_STAGE_DONE:
1012                 c->status = NT_STATUS_INTERNAL_ERROR;
1013         }
1014
1015         if (state->stage == WREPLSRV_PUSH_NOTIFY_STAGE_DONE) {
1016                 c->state  = COMPOSITE_STATE_DONE;
1017         }
1018
1019         if (!NT_STATUS_IS_OK(c->status)) {
1020                 c->state = COMPOSITE_STATE_ERROR;
1021         }
1022
1023         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
1024                 c->async.fn(c);
1025         }
1026 }
1027
1028 static void wreplsrv_push_notify_handler_creq(struct composite_context *creq)
1029 {
1030         struct wreplsrv_push_notify_state *state = talloc_get_type(creq->async.private_data,
1031                                                    struct wreplsrv_push_notify_state);
1032         wreplsrv_push_notify_handler(state);
1033         return;
1034 }
1035
1036 static void wreplsrv_push_notify_handler_req(struct wrepl_request *req)
1037 {
1038         struct wreplsrv_push_notify_state *state = talloc_get_type(req->async.private,
1039                                                    struct wreplsrv_push_notify_state);
1040         wreplsrv_push_notify_handler(state);
1041         return;
1042 }
1043
1044 struct composite_context *wreplsrv_push_notify_send(TALLOC_CTX *mem_ctx, struct wreplsrv_push_notify_io *io)
1045 {
1046         struct composite_context *c = NULL;
1047         struct wreplsrv_service *service = io->in.partner->service;
1048         struct wreplsrv_push_notify_state *state = NULL;
1049         enum winsrepl_partner_type partner_type;
1050
1051         c = talloc_zero(mem_ctx, struct composite_context);
1052         if (!c) goto failed;
1053
1054         state = talloc_zero(c, struct wreplsrv_push_notify_state);
1055         if (!state) goto failed;
1056         state->c        = c;
1057         state->io       = io;
1058
1059         if (io->in.inform) {
1060                 /* we can cache the connection in partner->push->wreplconn */
1061                 partner_type = WINSREPL_PARTNER_PUSH;
1062                 if (io->in.propagate) {
1063                         state->command  = WREPL_REPL_INFORM2;
1064                 } else {
1065                         state->command  = WREPL_REPL_INFORM;
1066                 }
1067         } else {
1068                 /* we can NOT cache the connection */
1069                 partner_type = WINSREPL_PARTNER_NONE;
1070                 if (io->in.propagate) {
1071                         state->command  = WREPL_REPL_UPDATE2;
1072                 } else {
1073                         state->command  = WREPL_REPL_UPDATE;
1074                 }       
1075         }
1076
1077         c->state        = COMPOSITE_STATE_IN_PROGRESS;
1078         c->event_ctx    = service->task->event_ctx;
1079         c->private_data = state;
1080
1081         state->stage    = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT;
1082         state->creq     = wreplsrv_out_connect_send(io->in.partner, partner_type, NULL);
1083         if (!state->creq) goto failed;
1084
1085         state->creq->async.fn           = wreplsrv_push_notify_handler_creq;
1086         state->creq->async.private_data = state;
1087
1088         return c;
1089 failed:
1090         talloc_free(c);
1091         return NULL;
1092 }
1093
1094 NTSTATUS wreplsrv_push_notify_recv(struct composite_context *c)
1095 {
1096         NTSTATUS status;
1097
1098         status = composite_wait(c);
1099
1100         talloc_free(c);
1101         return status;
1102 }