r20709: pass a repsFromTo1 struct down as it contains all needed info for the source dsa
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2006
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher 2007
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12    
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 2 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, write to the Free Software
25    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 */
27
28 /*
29  *  Name: ldb
30  *
31  *  Component: ldb repl_meta_data module
32  *
33  *  Description: - add a unique objectGUID onto every new record,
34  *               - handle whenCreated, whenChanged timestamps
35  *               - handle uSNCreated, uSNChanged numbers
36  *               - handle replPropertyMetaData attribute
37  *
38  *  Author: Simo Sorce
39  *  Author: Stefan Metzmacher
40  */
41
42 #include "includes.h"
43 #include "lib/ldb/include/ldb.h"
44 #include "lib/ldb/include/ldb_errors.h"
45 #include "lib/ldb/include/ldb_private.h"
46 #include "dsdb/samdb/samdb.h"
47 #include "librpc/gen_ndr/ndr_misc.h"
48 #include "librpc/gen_ndr/ndr_drsuapi.h"
49 #include "librpc/gen_ndr/ndr_drsblobs.h"
50
51 struct replmd_replicated_request {
52         struct ldb_module *module;
53         struct ldb_handle *handle;
54         struct ldb_request *orig_req;
55
56         struct dsdb_extended_replicated_objects *objs;
57
58         uint32_t index_current;
59
60         struct {
61                 TALLOC_CTX *mem_ctx;
62                 struct ldb_request *search_req;
63                 struct ldb_message *search_msg;
64                 int search_ret;
65                 struct ldb_request *change_req;
66                 int change_ret;
67         } sub;
68 };
69
70 static struct replmd_replicated_request *replmd_replicated_init_handle(struct ldb_module *module,
71                                                                        struct ldb_request *req,
72                                                                        struct dsdb_extended_replicated_objects *objs)
73 {
74         struct replmd_replicated_request *ar;
75         struct ldb_handle *h;
76
77         h = talloc_zero(req, struct ldb_handle);
78         if (h == NULL) {
79                 ldb_set_errstring(module->ldb, "Out of Memory");
80                 return NULL;
81         }
82
83         h->module       = module;
84         h->state        = LDB_ASYNC_PENDING;
85         h->status       = LDB_SUCCESS;
86
87         ar = talloc_zero(h, struct replmd_replicated_request);
88         if (ar == NULL) {
89                 ldb_set_errstring(module->ldb, "Out of Memory");
90                 talloc_free(h);
91                 return NULL;
92         }
93
94         h->private_data = ar;
95
96         ar->module      = module;
97         ar->handle      = h;
98         ar->orig_req    = req;
99         ar->objs        = objs;
100
101         req->handle = h;
102
103         return ar;
104 }
105
106 static struct ldb_message_element *replmd_find_attribute(const struct ldb_message *msg, const char *name)
107 {
108         int i;
109
110         for (i = 0; i < msg->num_elements; i++) {
111                 if (ldb_attr_cmp(name, msg->elements[i].name) == 0) {
112                         return &msg->elements[i];
113                 }
114         }
115
116         return NULL;
117 }
118
119 /*
120   add a time element to a record
121 */
122 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
123 {
124         struct ldb_message_element *el;
125         char *s;
126
127         if (ldb_msg_find_element(msg, attr) != NULL) {
128                 return 0;
129         }
130
131         s = ldb_timestring(msg, t);
132         if (s == NULL) {
133                 return -1;
134         }
135
136         if (ldb_msg_add_string(msg, attr, s) != 0) {
137                 return -1;
138         }
139
140         el = ldb_msg_find_element(msg, attr);
141         /* always set as replace. This works because on add ops, the flag
142            is ignored */
143         el->flags = LDB_FLAG_MOD_REPLACE;
144
145         return 0;
146 }
147
148 /*
149   add a uint64_t element to a record
150 */
151 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
152 {
153         struct ldb_message_element *el;
154
155         if (ldb_msg_find_element(msg, attr) != NULL) {
156                 return 0;
157         }
158
159         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != 0) {
160                 return -1;
161         }
162
163         el = ldb_msg_find_element(msg, attr);
164         /* always set as replace. This works because on add ops, the flag
165            is ignored */
166         el->flags = LDB_FLAG_MOD_REPLACE;
167
168         return 0;
169 }
170
171 static int replmd_add_replicated(struct ldb_module *module, struct ldb_request *req, struct ldb_control *ctrl)
172 {
173         struct ldb_control **saved_ctrls;
174         int ret;
175
176         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_add_replicated\n");
177
178         if (!save_controls(ctrl, req, &saved_ctrls)) {
179                 return LDB_ERR_OPERATIONS_ERROR;
180         }
181
182         ret = ldb_next_request(module, req);
183         req->controls = saved_ctrls;
184
185         return ret;
186 }
187
188 static int replmd_add_originating(struct ldb_module *module, struct ldb_request *req)
189 {
190         struct ldb_request *down_req;
191         struct ldb_message_element *attribute;
192         struct ldb_message *msg;
193         struct ldb_val v;
194         struct GUID guid;
195         uint64_t seq_num;
196         NTSTATUS nt_status;
197         int ret;
198         time_t t = time(NULL);
199
200         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_add_originating\n");
201
202         if ((attribute = replmd_find_attribute(req->op.add.message, "objectGUID")) != NULL ) {
203                 return ldb_next_request(module, req);
204         }
205
206         down_req = talloc(req, struct ldb_request);
207         if (down_req == NULL) {
208                 return LDB_ERR_OPERATIONS_ERROR;
209         }
210
211         *down_req = *req;
212
213         /* we have to copy the message as the caller might have it as a const */
214         down_req->op.add.message = msg = ldb_msg_copy_shallow(down_req, req->op.add.message);
215         if (msg == NULL) {
216                 talloc_free(down_req);
217                 return LDB_ERR_OPERATIONS_ERROR;
218         }
219
220         /* a new GUID */
221         guid = GUID_random();
222
223         nt_status = ndr_push_struct_blob(&v, msg, &guid, 
224                                          (ndr_push_flags_fn_t)ndr_push_GUID);
225         if (!NT_STATUS_IS_OK(nt_status)) {
226                 talloc_free(down_req);
227                 return LDB_ERR_OPERATIONS_ERROR;
228         }
229
230         ret = ldb_msg_add_value(msg, "objectGUID", &v, NULL);
231         if (ret) {
232                 talloc_free(down_req);
233                 return ret;
234         }
235         
236         if (add_time_element(msg, "whenCreated", t) != 0 ||
237             add_time_element(msg, "whenChanged", t) != 0) {
238                 talloc_free(down_req);
239                 return LDB_ERR_OPERATIONS_ERROR;
240         }
241
242         /* Get a sequence number from the backend */
243         ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
244         if (ret == LDB_SUCCESS) {
245                 if (add_uint64_element(msg, "uSNCreated", seq_num) != 0 ||
246                     add_uint64_element(msg, "uSNChanged", seq_num) != 0) {
247                         talloc_free(down_req);
248                         return LDB_ERR_OPERATIONS_ERROR;
249                 }
250         }
251
252         ldb_set_timeout_from_prev_req(module->ldb, req, down_req);
253
254         /* go on with the call chain */
255         ret = ldb_next_request(module, down_req);
256
257         /* do not free down_req as the call results may be linked to it,
258          * it will be freed when the upper level request get freed */
259         if (ret == LDB_SUCCESS) {
260                 req->handle = down_req->handle;
261         }
262
263         return ret;
264 }
265
266 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
267 {
268         struct ldb_control *ctrl;
269
270         /* do not manipulate our control entries */
271         if (ldb_dn_is_special(req->op.add.message->dn)) {
272                 return ldb_next_request(module, req);
273         }
274
275         ctrl = get_control_from_list(req->controls, DSDB_CONTROL_REPLICATED_OBJECT_OID);
276         if (ctrl) {
277                 /* handle replicated objects different */
278                 return replmd_add_replicated(module, req, ctrl);
279         }
280
281         return replmd_add_originating(module, req);
282 }
283
284 static int replmd_modify_replicated(struct ldb_module *module, struct ldb_request *req, struct ldb_control *ctrl)
285 {
286         struct ldb_control **saved_ctrls;
287         int ret;
288
289         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_modify_replicated\n");
290
291         if (!save_controls(ctrl, req, &saved_ctrls)) {
292                 return LDB_ERR_OPERATIONS_ERROR;
293         }
294
295         ret = ldb_next_request(module, req);
296         req->controls = saved_ctrls;
297
298         return ret;
299 }
300
301 static int replmd_modify_originating(struct ldb_module *module, struct ldb_request *req)
302 {
303         struct ldb_request *down_req;
304         struct ldb_message *msg;
305         int ret;
306         time_t t = time(NULL);
307         uint64_t seq_num;
308
309         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_modify_originating\n");
310
311         down_req = talloc(req, struct ldb_request);
312         if (down_req == NULL) {
313                 return LDB_ERR_OPERATIONS_ERROR;
314         }
315
316         *down_req = *req;
317
318         /* we have to copy the message as the caller might have it as a const */
319         down_req->op.mod.message = msg = ldb_msg_copy_shallow(down_req, req->op.mod.message);
320         if (msg == NULL) {
321                 talloc_free(down_req);
322                 return LDB_ERR_OPERATIONS_ERROR;
323         }
324
325         if (add_time_element(msg, "whenChanged", t) != 0) {
326                 talloc_free(down_req);
327                 return LDB_ERR_OPERATIONS_ERROR;
328         }
329
330         /* Get a sequence number from the backend */
331         ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
332         if (ret == LDB_SUCCESS) {
333                 if (add_uint64_element(msg, "uSNChanged", seq_num) != 0) {
334                         talloc_free(down_req);
335                         return LDB_ERR_OPERATIONS_ERROR;
336                 }
337         }
338
339         ldb_set_timeout_from_prev_req(module->ldb, req, down_req);
340
341         /* go on with the call chain */
342         ret = ldb_next_request(module, down_req);
343
344         /* do not free down_req as the call results may be linked to it,
345          * it will be freed when the upper level request get freed */
346         if (ret == LDB_SUCCESS) {
347                 req->handle = down_req->handle;
348         }
349
350         return ret;
351 }
352
353 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
354 {
355         struct ldb_control *ctrl;
356
357         /* do not manipulate our control entries */
358         if (ldb_dn_is_special(req->op.mod.message->dn)) {
359                 return ldb_next_request(module, req);
360         }
361
362         ctrl = get_control_from_list(req->controls, DSDB_CONTROL_REPLICATED_OBJECT_OID);
363         if (ctrl) {
364                 /* handle replicated objects different */
365                 return replmd_modify_replicated(module, req, ctrl);
366         }
367
368         return replmd_modify_originating(module, req);
369 }
370
371 static int replmd_replicated_request_reply_helper(struct replmd_replicated_request *ar, int ret)
372 {
373         struct ldb_reply *ares = NULL;
374
375         ar->handle->status = ret;
376         ar->handle->state = LDB_ASYNC_DONE;
377
378         if (!ar->orig_req->callback) {
379                 return LDB_SUCCESS;
380         }
381         
382         /* we're done and need to report the success to the caller */
383         ares = talloc_zero(ar, struct ldb_reply);
384         if (!ares) {
385                 ar->handle->status = LDB_ERR_OPERATIONS_ERROR;
386                 ar->handle->state = LDB_ASYNC_DONE;
387                 return LDB_ERR_OPERATIONS_ERROR;
388         }
389
390         ares->type      = LDB_REPLY_EXTENDED;
391         ares->response  = NULL;
392
393         return ar->orig_req->callback(ar->module->ldb, ar->orig_req->context, ares);
394 }
395
396 static int replmd_replicated_request_done(struct replmd_replicated_request *ar)
397 {
398         return replmd_replicated_request_reply_helper(ar, LDB_SUCCESS);
399 }
400
401 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
402 {
403         return replmd_replicated_request_reply_helper(ar, ret);
404 }
405
406 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
407 {
408         int ret = LDB_ERR_OTHER;
409         /* TODO: do some error mapping */
410         return replmd_replicated_request_reply_helper(ar, ret);
411 }
412
413 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
414
415 static int replmd_replicated_apply_add_callback(struct ldb_context *ldb,
416                                                 void *private_data,
417                                                 struct ldb_reply *ares)
418 {
419 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
420         struct replmd_replicated_request *ar = talloc_get_type(private_data,
421                                                struct replmd_replicated_request);
422
423         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
424         if (ar->sub.change_ret != LDB_SUCCESS) {
425                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
426         }
427
428         talloc_free(ar->sub.mem_ctx);
429         ZERO_STRUCT(ar->sub);
430
431         ar->index_current++;
432
433         return replmd_replicated_apply_next(ar);
434 #else
435         return LDB_SUCCESS;
436 #endif
437 }
438
439 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
440 {
441         NTSTATUS nt_status;
442         struct ldb_message *msg;
443         struct replPropertyMetaDataBlob *md;
444         struct ldb_val md_value;
445         uint32_t i;
446         uint64_t seq_num;
447         int ret;
448
449         msg = ar->objs->objects[ar->index_current].msg;
450         md = ar->objs->objects[ar->index_current].meta_data;
451
452         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
453         if (ret != LDB_SUCCESS) {
454                 return replmd_replicated_request_error(ar, ret);
455         }
456
457         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNCreated", seq_num);
458         if (ret != LDB_SUCCESS) {
459                 return replmd_replicated_request_error(ar, ret);
460         }
461
462         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
463         if (ret != LDB_SUCCESS) {
464                 return replmd_replicated_request_error(ar, ret);
465         }
466
467         md = ar->objs->objects[ar->index_current].meta_data;
468         for (i=0; i < md->ctr.ctr1.count; i++) {
469                 md->ctr.ctr1.array[i].local_usn = seq_num;
470         }
471         nt_status = ndr_push_struct_blob(&md_value, msg, md,
472                                          (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
473         if (!NT_STATUS_IS_OK(nt_status)) {
474                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
475         }
476         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
477         if (ret != LDB_SUCCESS) {
478                 return replmd_replicated_request_error(ar, ret);
479         }
480
481         ret = ldb_build_add_req(&ar->sub.change_req,
482                                 ar->module->ldb,
483                                 ar->sub.mem_ctx,
484                                 msg,
485                                 NULL,
486                                 ar,
487                                 replmd_replicated_apply_add_callback);
488         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
489
490 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
491         return ldb_next_request(ar->module, ar->sub.change_req);
492 #else
493         ret = ldb_next_request(ar->module, ar->sub.change_req);
494         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
495
496         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
497         if (ar->sub.change_ret != LDB_SUCCESS) {
498                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
499         }
500
501         talloc_free(ar->sub.mem_ctx);
502         ZERO_STRUCT(ar->sub);
503
504         ar->index_current++;
505
506         return LDB_SUCCESS;
507 #endif
508 }
509
510 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
511 {
512 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
513 #error sorry replmd_replicated_apply_merge not implemented
514 #else
515         ldb_debug(ar->module->ldb, LDB_DEBUG_FATAL,
516                   "replmd_replicated_apply_merge: ignore [%u]\n",
517                   ar->index_current);
518
519         talloc_free(ar->sub.mem_ctx);
520         ZERO_STRUCT(ar->sub);
521
522         ar->index_current++;
523
524         return LDB_SUCCESS;
525 #endif
526 }
527
528 static int replmd_replicated_apply_search_callback(struct ldb_context *ldb,
529                                                    void *private_data,
530                                                    struct ldb_reply *ares)
531 {
532         struct replmd_replicated_request *ar = talloc_get_type(private_data,
533                                                struct replmd_replicated_request);
534         bool is_done = false;
535
536         switch (ares->type) {
537         case LDB_REPLY_ENTRY:
538                 ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
539                 break;
540         case LDB_REPLY_REFERRAL:
541                 /* we ignore referrals */
542                 break;
543         case LDB_REPLY_EXTENDED:
544         case LDB_REPLY_DONE:
545                 is_done = true;
546         }
547
548         talloc_free(ares);
549
550 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
551         if (is_done) {
552                 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
553                 if (ar->sub.search_ret != LDB_SUCCESS) {
554                         return replmd_replicated_request_error(ar, ar->sub.search_ret);
555                 }
556                 if (ar->sub.search_msg) {
557                         return replmd_replicated_apply_merge(ar);
558                 }
559                 return replmd_replicated_apply_add(ar);
560         }
561 #endif
562         return LDB_SUCCESS;
563 }
564
565 static int replmd_replicated_apply_search(struct replmd_replicated_request *ar)
566 {
567         int ret;
568         char *tmp_str;
569         char *filter;
570
571         tmp_str = ldb_binary_encode(ar->sub.mem_ctx, ar->objs->objects[ar->index_current].guid_value);
572         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
573
574         filter = talloc_asprintf(ar->sub.mem_ctx, "(objectGUID=%s)", tmp_str);
575         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
576         talloc_free(tmp_str);
577
578         ret = ldb_build_search_req(&ar->sub.search_req,
579                                    ar->module->ldb,
580                                    ar->sub.mem_ctx,
581                                    ar->objs->partition_dn,
582                                    LDB_SCOPE_SUBTREE,
583                                    filter,
584                                    NULL,
585                                    NULL,
586                                    ar,
587                                    replmd_replicated_apply_search_callback);
588         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
589
590 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
591         return ldb_next_request(ar->module, ar->sub.search_req);
592 #else
593         ret = ldb_next_request(ar->module, ar->sub.search_req);
594         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
595
596         ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
597         if (ar->sub.search_ret != LDB_SUCCESS) {
598                 return replmd_replicated_request_error(ar, ar->sub.search_ret);
599         }
600         if (ar->sub.search_msg) {
601                 return replmd_replicated_apply_merge(ar);
602         }
603
604         return replmd_replicated_apply_add(ar);
605 #endif
606 }
607
608 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
609 {
610 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
611         if (ar->index_current >= ar->objs->num_objects) {
612                 return replmd_replicated_uptodate_vector(ar);
613         }
614 #endif
615
616         ar->sub.mem_ctx = talloc_new(ar);
617         if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
618
619         return replmd_replicated_apply_search(ar);
620 }
621
622 static int replmd_replicated_uptodate_modify_callback(struct ldb_context *ldb,
623                                                       void *private_data,
624                                                       struct ldb_reply *ares)
625 {
626 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
627         struct replmd_replicated_request *ar = talloc_get_type(private_data,
628                                                struct replmd_replicated_request);
629
630         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
631         if (ar->sub.change_ret != LDB_SUCCESS) {
632                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
633         }
634
635         talloc_free(ar->sub.mem_ctx);
636         ZERO_STRUCT(ar->sub);
637
638         return replmd_replicated_request_done(ar);
639 #else
640         return LDB_SUCCESS;
641 #endif
642 }
643
644 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
645 {
646         NTSTATUS nt_status;
647         struct ldb_message *msg;
648         struct replUpToDateVectorBlob ouv;
649         const struct ldb_val *ouv_value;
650         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
651         struct replUpToDateVectorBlob nuv;
652         struct ldb_val nuv_value;
653         struct ldb_message_element *nuv_el = NULL;
654         struct GUID *our_invocation_id;
655         uint32_t i,j,ni=0;
656         uint64_t seq_num;
657         bool found = false;
658         time_t t = time(NULL);
659         NTTIME now;
660         int ret;
661
662         ruv = ar->objs->uptodateness_vector;
663         ZERO_STRUCT(ouv);
664         ouv.version = 2;
665         ZERO_STRUCT(nuv);
666         nuv.version = 2;
667
668         unix_to_nt_time(&now, t);
669
670         /* 
671          * we use the next sequence number for our own highest_usn
672          * because we will do a modify request and this will increment
673          * our highest_usn
674          */
675         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
676         if (ret != LDB_SUCCESS) {
677                 return replmd_replicated_request_error(ar, ret);
678         }
679
680         /*
681          * first create the new replUpToDateVector
682          */
683         ouv_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replUpToDateVector");
684         if (ouv_value) {
685                 nt_status = ndr_pull_struct_blob(ouv_value, ar->sub.mem_ctx, &ouv,
686                                                  (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
687                 if (!NT_STATUS_IS_OK(nt_status)) {
688                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
689                 }
690
691                 if (ouv.version != 2) {
692                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
693                 }
694         }
695
696         /*
697          * the new uptodateness vector will at least
698          * contain 2 entries, one for the source_dsa and one the local server
699          *
700          * plus optional values from our old vector and the one from the source_dsa
701          */
702         nuv.ctr.ctr2.count = 2 + ouv.ctr.ctr2.count;
703         if (ruv) nuv.ctr.ctr2.count += ruv->count;
704         nuv.ctr.ctr2.cursors = talloc_array(ar->sub.mem_ctx,
705                                             struct drsuapi_DsReplicaCursor2,
706                                             nuv.ctr.ctr2.count);
707         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
708
709         /* first copy the old vector */
710         for (i=0; i < ouv.ctr.ctr2.count; i++) {
711                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
712                 ni++;
713         }
714
715         /* merge in the source_dsa vector is available */
716         for (i=0; (ruv && i < ruv->count); i++) {
717                 found = false;
718
719                 for (j=0; j < ni; j++) {
720                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
721                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
722                                 continue;
723                         }
724
725                         found = true;
726
727                         /*
728                          * we update only the highest_usn and not the latest_sync_success time,
729                          * because the last success stands for direct replication
730                          */
731                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
732                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
733                         }
734                         break;                  
735                 }
736
737                 if (found) continue;
738
739                 /* if it's not there yet, add it */
740                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
741                 ni++;
742         }
743
744         /*
745          * merge in the current highwatermark for the source_dsa
746          */
747         found = false;
748         for (j=0; j < ni; j++) {
749                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
750                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
751                         continue;
752                 }
753
754                 found = true;
755
756                 /*
757                  * here we update the highest_usn and last_sync_success time
758                  * because we're directly replicating from the source_dsa
759                  *
760                  * and use the tmp_highest_usn because this is what we have just applied
761                  * to our ldb
762                  */
763                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
764                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
765                 break;
766         }
767         if (!found) {
768                 /*
769                  * here we update the highest_usn and last_sync_success time
770                  * because we're directly replicating from the source_dsa
771                  *
772                  * and use the tmp_highest_usn because this is what we have just applied
773                  * to our ldb
774                  */
775                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
776                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
777                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
778                 ni++;
779         }
780
781         /*
782          * merge our own current values if we have a invocation_id already
783          * attached to the ldb
784          */
785         our_invocation_id = samdb_ntds_invocation_id(ar->module->ldb);
786         if (our_invocation_id) {
787                 found = false;
788                 for (j=0; j < ni; j++) {
789                         if (!GUID_equal(our_invocation_id,
790                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
791                                 continue;
792                         }
793
794                         found = true;
795
796                         /*
797                          * here we update the highest_usn and last_sync_success time
798                          * because it's our own entry
799                          */
800                         nuv.ctr.ctr2.cursors[j].highest_usn             = seq_num;
801                         nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
802                         break;
803                 }
804                 if (!found) {
805                         /*
806                          * here we update the highest_usn and last_sync_success time
807                          * because it's our own entry
808                          */
809                         nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= *our_invocation_id;
810                         nuv.ctr.ctr2.cursors[ni].highest_usn            = seq_num;
811                         nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
812                         ni++;
813                 }
814         }
815
816         /*
817          * finally correct the size of the cursors array
818          */
819         nuv.ctr.ctr2.count = ni;
820
821         /*
822          * create the change ldb_message
823          */
824         msg = ldb_msg_new(ar->sub.mem_ctx);
825         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
826         msg->dn = ar->sub.search_msg->dn;
827
828         nt_status = ndr_push_struct_blob(&nuv_value, msg, &nuv,
829                                          (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
830         if (!NT_STATUS_IS_OK(nt_status)) {
831                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
832         }
833         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
834         if (ret != LDB_SUCCESS) {
835                 return replmd_replicated_request_error(ar, ret);
836         }
837         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
838
839         ret = ldb_build_mod_req(&ar->sub.change_req,
840                                 ar->module->ldb,
841                                 ar->sub.mem_ctx,
842                                 msg,
843                                 NULL,
844                                 ar,
845                                 replmd_replicated_uptodate_modify_callback);
846         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
847
848 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
849         return ldb_next_request(ar->module, ar->sub.change_req);
850 #else
851         ret = ldb_next_request(ar->module, ar->sub.change_req);
852         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
853
854         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
855         if (ar->sub.change_ret != LDB_SUCCESS) {
856                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
857         }
858
859         talloc_free(ar->sub.mem_ctx);
860         ZERO_STRUCT(ar->sub);
861
862         return replmd_replicated_request_done(ar);
863 #endif
864 }
865
866 static int replmd_replicated_uptodate_search_callback(struct ldb_context *ldb,
867                                                       void *private_data,
868                                                       struct ldb_reply *ares)
869 {
870         struct replmd_replicated_request *ar = talloc_get_type(private_data,
871                                                struct replmd_replicated_request);
872         bool is_done = false;
873
874         switch (ares->type) {
875         case LDB_REPLY_ENTRY:
876                 ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
877                 break;
878         case LDB_REPLY_REFERRAL:
879                 /* we ignore referrals */
880                 break;
881         case LDB_REPLY_EXTENDED:
882         case LDB_REPLY_DONE:
883                 is_done = true;
884         }
885
886         talloc_free(ares);
887
888 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
889         if (is_done) {
890                 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
891                 if (ar->sub.search_ret != LDB_SUCCESS) {
892                         return replmd_replicated_request_error(ar, ar->sub.search_ret);
893                 }
894                 if (!ar->sub.search_msg) {
895                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
896                 }
897
898                 return replmd_replicated_uptodate_modify(ar);
899         }
900 #endif
901         return LDB_SUCCESS;
902 }
903
904 static int replmd_replicated_uptodate_search(struct replmd_replicated_request *ar)
905 {
906         int ret;
907         static const char *attrs[] = {
908                 "replUpToDateVector",
909                 NULL
910         };
911
912         ret = ldb_build_search_req(&ar->sub.search_req,
913                                    ar->module->ldb,
914                                    ar->sub.mem_ctx,
915                                    ar->objs->partition_dn,
916                                    LDB_SCOPE_BASE,
917                                    "(objectClass=*)",
918                                    attrs,
919                                    NULL,
920                                    ar,
921                                    replmd_replicated_uptodate_search_callback);
922         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
923
924 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
925         return ldb_next_request(ar->module, ar->sub.search_req);
926 #else
927         ret = ldb_next_request(ar->module, ar->sub.search_req);
928         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
929
930         ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
931         if (ar->sub.search_ret != LDB_SUCCESS) {
932                 return replmd_replicated_request_error(ar, ar->sub.search_ret);
933         }
934         if (!ar->sub.search_msg) {
935                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
936         }
937
938         return replmd_replicated_uptodate_modify(ar);
939 #endif
940 }
941
942 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
943 {
944         ar->sub.mem_ctx = talloc_new(ar);
945         if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
946
947         return replmd_replicated_uptodate_search(ar);
948 }
949
950 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
951 {
952         struct dsdb_extended_replicated_objects *objs;
953         struct replmd_replicated_request *ar;
954
955         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
956
957         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
958         if (!objs) {
959                 return LDB_ERR_PROTOCOL_ERROR;
960         }
961
962         ar = replmd_replicated_init_handle(module, req, objs);
963         if (!ar) {
964                 return LDB_ERR_OPERATIONS_ERROR;
965         }
966
967 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
968         return replmd_replicated_apply_next(ar);
969 #else
970         while (ar->index_current < ar->objs->num_objects &&
971                req->handle->state != LDB_ASYNC_DONE) { 
972                 replmd_replicated_apply_next(ar);
973         }
974
975         if (req->handle->state != LDB_ASYNC_DONE) {
976                 replmd_replicated_uptodate_vector(ar);
977         }
978
979         return LDB_SUCCESS;
980 #endif
981 }
982
983 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
984 {
985         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
986                 return replmd_extended_replicated_objects(module, req);
987         }
988
989         return ldb_next_request(module, req);
990 }
991
992 static int replmd_wait_none(struct ldb_handle *handle) {
993         struct replmd_replicated_request *ar;
994     
995         if (!handle || !handle->private_data) {
996                 return LDB_ERR_OPERATIONS_ERROR;
997         }
998
999         ar = talloc_get_type(handle->private_data, struct replmd_replicated_request);
1000         if (!ar) {
1001                 return LDB_ERR_OPERATIONS_ERROR;
1002         }
1003
1004         /* we do only sync calls */
1005         if (handle->state != LDB_ASYNC_DONE) {
1006                 return LDB_ERR_OPERATIONS_ERROR;
1007         }
1008
1009         return handle->status;
1010 }
1011
1012 static int replmd_wait_all(struct ldb_handle *handle) {
1013
1014         int ret;
1015
1016         while (handle->state != LDB_ASYNC_DONE) {
1017                 ret = replmd_wait_none(handle);
1018                 if (ret != LDB_SUCCESS) {
1019                         return ret;
1020                 }
1021         }
1022
1023         return handle->status;
1024 }
1025
1026 static int replmd_wait(struct ldb_handle *handle, enum ldb_wait_type type)
1027 {
1028         if (type == LDB_WAIT_ALL) {
1029                 return replmd_wait_all(handle);
1030         } else {
1031                 return replmd_wait_none(handle);
1032         }
1033 }
1034
1035 static const struct ldb_module_ops replmd_ops = {
1036         .name          = "repl_meta_data",
1037         .add           = replmd_add,
1038         .modify        = replmd_modify,
1039         .extended      = replmd_extended,
1040         .wait          = replmd_wait
1041 };
1042
1043 int repl_meta_data_module_init(void)
1044 {
1045         return ldb_register_module(&replmd_ops);
1046 }