Fix use of some modules (needed _PUBLIC_).
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2006
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12    
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb
29  *
30  *  Component: ldb repl_meta_data module
31  *
32  *  Description: - add a unique objectGUID onto every new record,
33  *               - handle whenCreated, whenChanged timestamps
34  *               - handle uSNCreated, uSNChanged numbers
35  *               - handle replPropertyMetaData attribute
36  *
37  *  Author: Simo Sorce
38  *  Author: Stefan Metzmacher
39  */
40
41 #include "includes.h"
42 #include "lib/ldb/include/ldb.h"
43 #include "lib/ldb/include/ldb_errors.h"
44 #include "lib/ldb/include/ldb_private.h"
45 #include "dsdb/samdb/samdb.h"
46 #include "dsdb/common/flags.h"
47 #include "librpc/gen_ndr/ndr_misc.h"
48 #include "librpc/gen_ndr/ndr_drsuapi.h"
49 #include "librpc/gen_ndr/ndr_drsblobs.h"
50 #include "param/param.h"
51
52 struct replmd_replicated_request {
53         struct ldb_module *module;
54         struct ldb_handle *handle;
55         struct ldb_request *orig_req;
56
57         const struct dsdb_schema *schema;
58
59         struct dsdb_extended_replicated_objects *objs;
60
61         uint32_t index_current;
62
63         struct {
64                 TALLOC_CTX *mem_ctx;
65                 struct ldb_request *search_req;
66                 struct ldb_message *search_msg;
67                 int search_ret;
68                 struct ldb_request *change_req;
69                 int change_ret;
70         } sub;
71 };
72
73 static struct replmd_replicated_request *replmd_replicated_init_handle(struct ldb_module *module,
74                                                                        struct ldb_request *req,
75                                                                        struct dsdb_extended_replicated_objects *objs)
76 {
77         struct replmd_replicated_request *ar;
78         struct ldb_handle *h;
79         const struct dsdb_schema *schema;
80
81         schema = dsdb_get_schema(module->ldb);
82         if (!schema) {
83                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
84                               "replmd_replicated_init_handle: no loaded schema found\n");
85                 return NULL;
86         }
87
88         h = talloc_zero(req, struct ldb_handle);
89         if (h == NULL) {
90                 ldb_set_errstring(module->ldb, "Out of Memory");
91                 return NULL;
92         }
93
94         h->module       = module;
95         h->state        = LDB_ASYNC_PENDING;
96         h->status       = LDB_SUCCESS;
97
98         ar = talloc_zero(h, struct replmd_replicated_request);
99         if (ar == NULL) {
100                 ldb_set_errstring(module->ldb, "Out of Memory");
101                 talloc_free(h);
102                 return NULL;
103         }
104
105         h->private_data = ar;
106
107         ar->module      = module;
108         ar->handle      = h;
109         ar->orig_req    = req;
110         ar->schema      = schema;
111         ar->objs        = objs;
112
113         req->handle = h;
114
115         return ar;
116 }
117
118 /*
119   add a time element to a record
120 */
121 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
122 {
123         struct ldb_message_element *el;
124         char *s;
125
126         if (ldb_msg_find_element(msg, attr) != NULL) {
127                 return 0;
128         }
129
130         s = ldb_timestring(msg, t);
131         if (s == NULL) {
132                 return -1;
133         }
134
135         if (ldb_msg_add_string(msg, attr, s) != 0) {
136                 return -1;
137         }
138
139         el = ldb_msg_find_element(msg, attr);
140         /* always set as replace. This works because on add ops, the flag
141            is ignored */
142         el->flags = LDB_FLAG_MOD_REPLACE;
143
144         return 0;
145 }
146
147 /*
148   add a uint64_t element to a record
149 */
150 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
151 {
152         struct ldb_message_element *el;
153
154         if (ldb_msg_find_element(msg, attr) != NULL) {
155                 return 0;
156         }
157
158         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != 0) {
159                 return -1;
160         }
161
162         el = ldb_msg_find_element(msg, attr);
163         /* always set as replace. This works because on add ops, the flag
164            is ignored */
165         el->flags = LDB_FLAG_MOD_REPLACE;
166
167         return 0;
168 }
169
170 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
171                                                    const struct replPropertyMetaData1 *m2,
172                                                    const uint32_t *rdn_attid)
173 {
174         if (m1->attid == m2->attid) {
175                 return 0;
176         }
177
178         /*
179          * the rdn attribute should be at the end!
180          * so we need to return a value greater than zero
181          * which means m1 is greater than m2
182          */
183         if (m1->attid == *rdn_attid) {
184                 return 1;
185         }
186
187         /*
188          * the rdn attribute should be at the end!
189          * so we need to return a value less than zero
190          * which means m2 is greater than m1
191          */
192         if (m2->attid == *rdn_attid) {
193                 return -1;
194         }
195
196         return m1->attid - m2->attid;
197 }
198
199 static void replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
200                                                  const uint32_t *rdn_attid)
201 {
202         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
203                   discard_const_p(void, rdn_attid), (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
204 }
205
206 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
207                                                  const struct ldb_message_element *e2,
208                                                  const struct dsdb_schema *schema)
209 {
210         const struct dsdb_attribute *a1;
211         const struct dsdb_attribute *a2;
212
213         /* 
214          * TODO: make this faster by caching the dsdb_attribute pointer
215          *       on the ldb_messag_element
216          */
217
218         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
219         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
220
221         /*
222          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
223          *       in the schema
224          */
225         if (!a1 || !a2) {
226                 return strcasecmp(e1->name, e2->name);
227         }
228
229         return a1->attributeID_id - a2->attributeID_id;
230 }
231
232 static void replmd_ldb_message_sort(struct ldb_message *msg,
233                                     const struct dsdb_schema *schema)
234 {
235         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
236                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
237 }
238
239 static int replmd_prepare_originating(struct ldb_module *module, struct ldb_request *req,
240                                       struct ldb_dn *dn, const char *fn_name,
241                                       int (*fn)(struct ldb_module *,
242                                                 struct ldb_request *,
243                                                 const struct dsdb_schema *))
244 {
245         const struct dsdb_schema *schema;
246  
247         /* do not manipulate our control entries */
248         if (ldb_dn_is_special(dn)) {
249                 return ldb_next_request(module, req);
250         }
251
252         schema = dsdb_get_schema(module->ldb);
253         if (!schema) {
254                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
255                               "%s: no dsdb_schema loaded",
256                               fn_name);
257                 return LDB_ERR_CONSTRAINT_VIOLATION;
258         }
259
260         return fn(module, req, schema);
261 }
262
263 static int replmd_add_originating(struct ldb_module *module,
264                                   struct ldb_request *req,
265                                   const struct dsdb_schema *schema)
266 {
267         enum ndr_err_code ndr_err;
268         struct ldb_request *down_req;
269         struct ldb_message *msg;
270         const struct dsdb_attribute *rdn_attr = NULL;
271         struct GUID guid;
272         struct ldb_val guid_value;
273         struct replPropertyMetaDataBlob nmd;
274         struct ldb_val nmd_value;
275         uint64_t seq_num;
276         const struct GUID *our_invocation_id;
277         time_t t = time(NULL);
278         NTTIME now;
279         char *time_str;
280         int ret;
281         uint32_t i, ni=0;
282
283         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_add_originating\n");
284
285         if (ldb_msg_find_element(req->op.add.message, "objectGUID")) {
286                 ldb_debug_set(module->ldb, LDB_DEBUG_ERROR,
287                               "replmd_add_originating: it's not allowed to add an object with objectGUID\n");
288                 return LDB_ERR_UNWILLING_TO_PERFORM;
289         }
290
291         /* Get a sequence number from the backend */
292         ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
293         if (ret != LDB_SUCCESS) {
294                 return ret;
295         }
296
297         /* a new GUID */
298         guid = GUID_random();
299
300         /* get our invicationId */
301         our_invocation_id = samdb_ntds_invocation_id(module->ldb);
302         if (!our_invocation_id) {
303                 ldb_debug_set(module->ldb, LDB_DEBUG_ERROR,
304                               "replmd_add_originating: unable to find invocationId\n");
305                 return LDB_ERR_OPERATIONS_ERROR;
306         }
307
308         /* create a copy of the request */
309         down_req = talloc(req, struct ldb_request);
310         if (down_req == NULL) {
311                 ldb_oom(module->ldb);
312                 return LDB_ERR_OPERATIONS_ERROR;
313         }
314         *down_req = *req;
315
316         /* we have to copy the message as the caller might have it as a const */
317         down_req->op.add.message = msg = ldb_msg_copy_shallow(down_req, req->op.add.message);
318         if (msg == NULL) {
319                 talloc_free(down_req);
320                 ldb_oom(module->ldb);
321                 return LDB_ERR_OPERATIONS_ERROR;
322         }
323
324         /* generated times */
325         unix_to_nt_time(&now, t);
326         time_str = ldb_timestring(msg, t);
327         if (!time_str) {
328                 talloc_free(down_req);
329                 return LDB_ERR_OPERATIONS_ERROR;
330         }
331
332         /* 
333          * remove autogenerated attributes
334          */
335         ldb_msg_remove_attr(msg, "whenCreated");
336         ldb_msg_remove_attr(msg, "whenChanged");
337         ldb_msg_remove_attr(msg, "uSNCreated");
338         ldb_msg_remove_attr(msg, "uSNChanged");
339         ldb_msg_remove_attr(msg, "replPropertyMetaData");
340
341         /*
342          * readd replicated attributes
343          */
344         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
345         if (ret != LDB_SUCCESS) {
346                 talloc_free(down_req);
347                 ldb_oom(module->ldb);
348                 return LDB_ERR_OPERATIONS_ERROR;
349         }
350
351         /* build the replication meta_data */
352         ZERO_STRUCT(nmd);
353         nmd.version             = 1;
354         nmd.ctr.ctr1.count      = msg->num_elements;
355         nmd.ctr.ctr1.array      = talloc_array(msg,
356                                                struct replPropertyMetaData1,
357                                                nmd.ctr.ctr1.count);
358         if (!nmd.ctr.ctr1.array) {
359                 talloc_free(down_req);
360                 ldb_oom(module->ldb);
361                 return LDB_ERR_OPERATIONS_ERROR;
362         }
363
364         for (i=0; i < msg->num_elements; i++) {
365                 struct ldb_message_element *e = &msg->elements[i];
366                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
367                 const struct dsdb_attribute *sa;
368
369                 if (e->name[0] == '@') continue;
370
371                 sa = dsdb_attribute_by_lDAPDisplayName(schema, e->name);
372                 if (!sa) {
373                         ldb_debug_set(module->ldb, LDB_DEBUG_ERROR,
374                                       "replmd_add_originating: attribute '%s' not defined in schema\n",
375                                       e->name);
376                         talloc_free(down_req);
377                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
378                 }
379
380                 if ((sa->systemFlags & 0x00000001) || (sa->systemFlags & 0x00000004)) {
381                         /* if the attribute is not replicated (0x00000001)
382                          * or constructed (0x00000004) it has no metadata
383                          */
384                         continue;
385                 }
386
387                 m->attid                        = sa->attributeID_id;
388                 m->version                      = 1;
389                 m->originating_change_time      = now;
390                 m->originating_invocation_id    = *our_invocation_id;
391                 m->originating_usn              = seq_num;
392                 m->local_usn                    = seq_num;
393                 ni++;
394
395                 if (ldb_attr_cmp(e->name, ldb_dn_get_rdn_name(msg->dn))) {
396                         rdn_attr = sa;
397                 }
398         }
399
400         /* fix meta data count */
401         nmd.ctr.ctr1.count = ni;
402
403         /*
404          * sort meta data array, and move the rdn attribute entry to the end
405          */
406         replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_attr->attributeID_id);
407
408         /* generated NDR encoded values */
409         ndr_err = ndr_push_struct_blob(&guid_value, msg, 
410                                        NULL,
411                                        &guid,
412                                        (ndr_push_flags_fn_t)ndr_push_GUID);
413         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
414                 ldb_oom(module->ldb);
415                 return LDB_ERR_OPERATIONS_ERROR;
416         }
417         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
418                                        lp_iconv_convenience(ldb_get_opaque(module->ldb, "loadparm")),
419                                        &nmd,
420                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
421         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
422                 talloc_free(down_req);
423                 ldb_oom(module->ldb);
424                 return LDB_ERR_OPERATIONS_ERROR;
425         }
426
427         /*
428          * add the autogenerated values
429          */
430         ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
431         if (ret != LDB_SUCCESS) {
432                 talloc_free(down_req);
433                 ldb_oom(module->ldb);
434                 return LDB_ERR_OPERATIONS_ERROR;
435         }
436         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
437         if (ret != LDB_SUCCESS) {
438                 talloc_free(down_req);
439                 ldb_oom(module->ldb);
440                 return LDB_ERR_OPERATIONS_ERROR;
441         }
442         ret = samdb_msg_add_uint64(module->ldb, msg, msg, "uSNCreated", seq_num);
443         if (ret != LDB_SUCCESS) {
444                 talloc_free(down_req);
445                 ldb_oom(module->ldb);
446                 return LDB_ERR_OPERATIONS_ERROR;
447         }
448         ret = samdb_msg_add_uint64(module->ldb, msg, msg, "uSNChanged", seq_num);
449         if (ret != LDB_SUCCESS) {
450                 talloc_free(down_req);
451                 ldb_oom(module->ldb);
452                 return LDB_ERR_OPERATIONS_ERROR;
453         }
454         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
455         if (ret != LDB_SUCCESS) {
456                 talloc_free(down_req);
457                 ldb_oom(module->ldb);
458                 return LDB_ERR_OPERATIONS_ERROR;
459         }
460
461         /*
462          * sort the attributes by attid before storing the object
463          */
464         replmd_ldb_message_sort(msg, schema);
465
466         ldb_set_timeout_from_prev_req(module->ldb, req, down_req);
467
468         /* go on with the call chain */
469         ret = ldb_next_request(module, down_req);
470
471         /* do not free down_req as the call results may be linked to it,
472          * it will be freed when the upper level request get freed */
473         if (ret == LDB_SUCCESS) {
474                 req->handle = down_req->handle;
475         }
476
477         return ret;
478 }
479
480 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
481 {
482         return replmd_prepare_originating(module, req, req->op.add.message->dn,
483                                           "replmd_add", replmd_add_originating);
484 }
485
486 static int replmd_modify_originating(struct ldb_module *module,
487                                      struct ldb_request *req,
488                                      const struct dsdb_schema *schema)
489 {
490         struct ldb_request *down_req;
491         struct ldb_message *msg;
492         int ret;
493         time_t t = time(NULL);
494         uint64_t seq_num;
495
496         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_modify_originating\n");
497
498         down_req = talloc(req, struct ldb_request);
499         if (down_req == NULL) {
500                 return LDB_ERR_OPERATIONS_ERROR;
501         }
502
503         *down_req = *req;
504
505         /* we have to copy the message as the caller might have it as a const */
506         down_req->op.mod.message = msg = ldb_msg_copy_shallow(down_req, req->op.mod.message);
507         if (msg == NULL) {
508                 talloc_free(down_req);
509                 return LDB_ERR_OPERATIONS_ERROR;
510         }
511
512         /* TODO:
513          * - get the whole old object
514          * - if the old object doesn't exist report an error
515          * - give an error when a readonly attribute should
516          *   be modified
517          * - merge the changed into the old object
518          *   if the caller set values to the same value
519          *   ignore the attribute, return success when no
520          *   attribute was changed
521          * - calculate the new replPropertyMetaData attribute
522          */
523
524         if (add_time_element(msg, "whenChanged", t) != 0) {
525                 talloc_free(down_req);
526                 return LDB_ERR_OPERATIONS_ERROR;
527         }
528
529         /* Get a sequence number from the backend */
530         ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
531         if (ret == LDB_SUCCESS) {
532                 if (add_uint64_element(msg, "uSNChanged", seq_num) != 0) {
533                         talloc_free(down_req);
534                         return LDB_ERR_OPERATIONS_ERROR;
535                 }
536         }
537
538         /* TODO:
539          * - sort the attributes by attid with replmd_ldb_message_sort()
540          * - replace the old object with the newly constructed one
541          */
542
543         ldb_set_timeout_from_prev_req(module->ldb, req, down_req);
544
545         /* go on with the call chain */
546         ret = ldb_next_request(module, down_req);
547
548         /* do not free down_req as the call results may be linked to it,
549          * it will be freed when the upper level request get freed */
550         if (ret == LDB_SUCCESS) {
551                 req->handle = down_req->handle;
552         }
553
554         return ret;
555 }
556
557 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
558 {
559         return replmd_prepare_originating(module, req, req->op.mod.message->dn,
560                                           "replmd_modify", replmd_modify_originating);
561 }
562
563 static int replmd_replicated_request_reply_helper(struct replmd_replicated_request *ar, int ret)
564 {
565         struct ldb_reply *ares = NULL;
566
567         ar->handle->status = ret;
568         ar->handle->state = LDB_ASYNC_DONE;
569
570         if (!ar->orig_req->callback) {
571                 return LDB_SUCCESS;
572         }
573         
574         /* we're done and need to report the success to the caller */
575         ares = talloc_zero(ar, struct ldb_reply);
576         if (!ares) {
577                 ar->handle->status = LDB_ERR_OPERATIONS_ERROR;
578                 ar->handle->state = LDB_ASYNC_DONE;
579                 return LDB_ERR_OPERATIONS_ERROR;
580         }
581
582         ares->type      = LDB_REPLY_EXTENDED;
583         ares->response  = NULL;
584
585         return ar->orig_req->callback(ar->module->ldb, ar->orig_req->context, ares);
586 }
587
588 static int replmd_replicated_request_done(struct replmd_replicated_request *ar)
589 {
590         return replmd_replicated_request_reply_helper(ar, LDB_SUCCESS);
591 }
592
593 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
594 {
595         return replmd_replicated_request_reply_helper(ar, ret);
596 }
597
598 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
599 {
600         int ret = LDB_ERR_OTHER;
601         /* TODO: do some error mapping */
602         return replmd_replicated_request_reply_helper(ar, ret);
603 }
604
605 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
606
607 static int replmd_replicated_apply_add_callback(struct ldb_context *ldb,
608                                                 void *private_data,
609                                                 struct ldb_reply *ares)
610 {
611 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
612         struct replmd_replicated_request *ar = talloc_get_type(private_data,
613                                                struct replmd_replicated_request);
614
615         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
616         if (ar->sub.change_ret != LDB_SUCCESS) {
617                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
618         }
619
620         talloc_free(ar->sub.mem_ctx);
621         ZERO_STRUCT(ar->sub);
622
623         ar->index_current++;
624
625         return replmd_replicated_apply_next(ar);
626 #else
627         return LDB_SUCCESS;
628 #endif
629 }
630
631 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
632 {
633         enum ndr_err_code ndr_err;
634         struct ldb_message *msg;
635         struct replPropertyMetaDataBlob *md;
636         struct ldb_val md_value;
637         uint32_t i;
638         uint64_t seq_num;
639         int ret;
640
641         /*
642          * TODO: check if the parent object exist
643          */
644
645         /*
646          * TODO: handle the conflict case where an object with the
647          *       same name exist
648          */
649
650         msg = ar->objs->objects[ar->index_current].msg;
651         md = ar->objs->objects[ar->index_current].meta_data;
652
653         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
654         if (ret != LDB_SUCCESS) {
655                 return replmd_replicated_request_error(ar, ret);
656         }
657
658         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
659         if (ret != LDB_SUCCESS) {
660                 return replmd_replicated_request_error(ar, ret);
661         }
662
663         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
664         if (ret != LDB_SUCCESS) {
665                 return replmd_replicated_request_error(ar, ret);
666         }
667
668         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNCreated", seq_num);
669         if (ret != LDB_SUCCESS) {
670                 return replmd_replicated_request_error(ar, ret);
671         }
672
673         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
674         if (ret != LDB_SUCCESS) {
675                 return replmd_replicated_request_error(ar, ret);
676         }
677
678         /*
679          * the meta data array is already sorted by the caller
680          */
681         for (i=0; i < md->ctr.ctr1.count; i++) {
682                 md->ctr.ctr1.array[i].local_usn = seq_num;
683         }
684         ndr_err = ndr_push_struct_blob(&md_value, msg, 
685                                        lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")),
686                                        md,
687                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
688         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
689                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
690                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
691         }
692         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
693         if (ret != LDB_SUCCESS) {
694                 return replmd_replicated_request_error(ar, ret);
695         }
696
697         replmd_ldb_message_sort(msg, ar->schema);
698
699         ret = ldb_build_add_req(&ar->sub.change_req,
700                                 ar->module->ldb,
701                                 ar->sub.mem_ctx,
702                                 msg,
703                                 NULL,
704                                 ar,
705                                 replmd_replicated_apply_add_callback);
706         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
707
708 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
709         return ldb_next_request(ar->module, ar->sub.change_req);
710 #else
711         ret = ldb_next_request(ar->module, ar->sub.change_req);
712         if (ret != LDB_SUCCESS) {
713                 ldb_asprintf_errstring(ar->module->ldb, "Failed to add replicated object %s: %s", ldb_dn_get_linearized(ar->sub.change_req->op.add.message->dn), 
714                                        ldb_errstring(ar->module->ldb));
715                 return replmd_replicated_request_error(ar, ret);
716         }
717
718         ar->sub.change_ret = ldb_wait(ar->sub.change_req->handle, LDB_WAIT_ALL);
719         if (ar->sub.change_ret != LDB_SUCCESS) {
720                 ldb_asprintf_errstring(ar->module->ldb, "Failed while waiting on add replicated object %s: %s", ldb_dn_get_linearized(ar->sub.change_req->op.add.message->dn), 
721                                        ldb_errstring(ar->module->ldb));
722                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
723         }
724
725         talloc_free(ar->sub.mem_ctx);
726         ZERO_STRUCT(ar->sub);
727
728         ar->index_current++;
729
730         return LDB_SUCCESS;
731 #endif
732 }
733
734 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
735                                                          struct replPropertyMetaData1 *m2)
736 {
737         int ret;
738
739         if (m1->version != m2->version) {
740                 return m1->version - m2->version;
741         }
742
743         if (m1->originating_change_time != m2->originating_change_time) {
744                 return m1->originating_change_time - m2->originating_change_time;
745         }
746
747         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
748         if (ret != 0) {
749                 return ret;
750         }
751
752         return m1->originating_usn - m2->originating_usn;
753 }
754
755 static int replmd_replicated_apply_merge_callback(struct ldb_context *ldb,
756                                                   void *private_data,
757                                                   struct ldb_reply *ares)
758 {
759 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
760         struct replmd_replicated_request *ar = talloc_get_type(private_data,
761                                                struct replmd_replicated_request);
762
763         ret = ldb_next_request(ar->module, ar->sub.change_req);
764         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
765
766         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
767         if (ar->sub.change_ret != LDB_SUCCESS) {
768                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
769         }
770
771         talloc_free(ar->sub.mem_ctx);
772         ZERO_STRUCT(ar->sub);
773
774         ar->index_current++;
775
776         return LDB_SUCCESS;
777 #else
778         return LDB_SUCCESS;
779 #endif
780 }
781
782 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
783 {
784         enum ndr_err_code ndr_err;
785         struct ldb_message *msg;
786         struct replPropertyMetaDataBlob *rmd;
787         struct replPropertyMetaDataBlob omd;
788         const struct ldb_val *omd_value;
789         struct replPropertyMetaDataBlob nmd;
790         struct ldb_val nmd_value;
791         uint32_t i,j,ni=0;
792         uint32_t removed_attrs = 0;
793         uint64_t seq_num;
794         int ret;
795
796         msg = ar->objs->objects[ar->index_current].msg;
797         rmd = ar->objs->objects[ar->index_current].meta_data;
798         ZERO_STRUCT(omd);
799         omd.version = 1;
800
801         /*
802          * TODO: add rename conflict handling
803          */
804         if (ldb_dn_compare(msg->dn, ar->sub.search_msg->dn) != 0) {
805                 ldb_debug_set(ar->module->ldb, LDB_DEBUG_FATAL, "replmd_replicated_apply_merge[%u]: rename not supported",
806                               ar->index_current);
807                 ldb_debug(ar->module->ldb, LDB_DEBUG_FATAL, "%s => %s\n",
808                           ldb_dn_get_linearized(ar->sub.search_msg->dn),
809                           ldb_dn_get_linearized(msg->dn));
810                 return replmd_replicated_request_werror(ar, WERR_NOT_SUPPORTED);
811         }
812
813         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
814         if (ret != LDB_SUCCESS) {
815                 return replmd_replicated_request_error(ar, ret);
816         }
817
818         /* find existing meta data */
819         omd_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replPropertyMetaData");
820         if (omd_value) {
821                 ndr_err = ndr_pull_struct_blob(omd_value, ar->sub.mem_ctx, 
822                                                lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")), &omd,
823                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
824                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
825                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
826                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
827                 }
828
829                 if (omd.version != 1) {
830                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
831                 }
832         }
833
834         ZERO_STRUCT(nmd);
835         nmd.version = 1;
836         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
837         nmd.ctr.ctr1.array = talloc_array(ar->sub.mem_ctx,
838                                           struct replPropertyMetaData1,
839                                           nmd.ctr.ctr1.count);
840         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
841
842         /* first copy the old meta data */
843         for (i=0; i < omd.ctr.ctr1.count; i++) {
844                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
845                 ni++;
846         }
847
848         /* now merge in the new meta data */
849         for (i=0; i < rmd->ctr.ctr1.count; i++) {
850                 bool found = false;
851
852                 rmd->ctr.ctr1.array[i].local_usn = seq_num;
853
854                 for (j=0; j < ni; j++) {
855                         int cmp;
856
857                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
858                                 continue;
859                         }
860
861                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
862                                                                             &nmd.ctr.ctr1.array[j]);
863                         if (cmp > 0) {
864                                 /* replace the entry */
865                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
866                                 found = true;
867                                 break;
868                         }
869
870                         /* we don't want to apply this change so remove the attribute */
871                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
872                         removed_attrs++;
873
874                         found = true;
875                         break;
876                 }
877
878                 if (found) continue;
879
880                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
881                 ni++;
882         }
883
884         /*
885          * finally correct the size of the meta_data array
886          */
887         nmd.ctr.ctr1.count = ni;
888
889         /*
890          * the rdn attribute (the alias for the name attribute),
891          * 'cn' for most objects is the last entry in the meta data array
892          * we have stored
893          *
894          * sort the new meta data array
895          */
896         {
897                 struct replPropertyMetaData1 *rdn_p;
898                 uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
899
900                 rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
901                 replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_p->attid);
902         }
903
904         /* create the meta data value */
905         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
906                                        lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")),
907                                        &nmd,
908                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
909         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
910                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
911                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
912         }
913
914         /*
915          * check if some replicated attributes left, otherwise skip the ldb_modify() call
916          */
917         if (msg->num_elements == 0) {
918                 ldb_debug(ar->module->ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
919                           ar->index_current);
920                 goto next_object;
921         }
922
923         ldb_debug(ar->module->ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
924                   ar->index_current, msg->num_elements);
925
926         /*
927          * when we now that we'll modify the record, add the whenChanged, uSNChanged
928          * and replPopertyMetaData attributes
929          */
930         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
931         if (ret != LDB_SUCCESS) {
932                 return replmd_replicated_request_error(ar, ret);
933         }
934         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
935         if (ret != LDB_SUCCESS) {
936                 return replmd_replicated_request_error(ar, ret);
937         }
938         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
939         if (ret != LDB_SUCCESS) {
940                 return replmd_replicated_request_error(ar, ret);
941         }
942
943         replmd_ldb_message_sort(msg, ar->schema);
944
945         /* we want to replace the old values */
946         for (i=0; i < msg->num_elements; i++) {
947                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
948         }
949
950         ret = ldb_build_mod_req(&ar->sub.change_req,
951                                 ar->module->ldb,
952                                 ar->sub.mem_ctx,
953                                 msg,
954                                 NULL,
955                                 ar,
956                                 replmd_replicated_apply_merge_callback);
957         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
958
959 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
960         return ldb_next_request(ar->module, ar->sub.change_req);
961 #else
962         ret = ldb_next_request(ar->module, ar->sub.change_req);
963         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
964
965         ar->sub.change_ret = ldb_wait(ar->sub.change_req->handle, LDB_WAIT_ALL);
966         if (ar->sub.change_ret != LDB_SUCCESS) {
967                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
968         }
969
970 next_object:
971         talloc_free(ar->sub.mem_ctx);
972         ZERO_STRUCT(ar->sub);
973
974         ar->index_current++;
975
976         return LDB_SUCCESS;
977 #endif
978 }
979
980 static int replmd_replicated_apply_search_callback(struct ldb_context *ldb,
981                                                    void *private_data,
982                                                    struct ldb_reply *ares)
983 {
984         struct replmd_replicated_request *ar = talloc_get_type(private_data,
985                                                struct replmd_replicated_request);
986         bool is_done = false;
987
988         switch (ares->type) {
989         case LDB_REPLY_ENTRY:
990                 ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
991                 break;
992         case LDB_REPLY_REFERRAL:
993                 /* we ignore referrals */
994                 break;
995         case LDB_REPLY_EXTENDED:
996         case LDB_REPLY_DONE:
997                 is_done = true;
998         }
999
1000         talloc_free(ares);
1001
1002 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1003         if (is_done) {
1004                 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1005                 if (ar->sub.search_ret != LDB_SUCCESS) {
1006                         return replmd_replicated_request_error(ar, ar->sub.search_ret);
1007                 }
1008                 if (ar->sub.search_msg) {
1009                         return replmd_replicated_apply_merge(ar);
1010                 }
1011                 return replmd_replicated_apply_add(ar);
1012         }
1013 #endif
1014         return LDB_SUCCESS;
1015 }
1016
1017 static int replmd_replicated_apply_search(struct replmd_replicated_request *ar)
1018 {
1019         int ret;
1020         char *tmp_str;
1021         char *filter;
1022
1023         tmp_str = ldb_binary_encode(ar->sub.mem_ctx, ar->objs->objects[ar->index_current].guid_value);
1024         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1025
1026         filter = talloc_asprintf(ar->sub.mem_ctx, "(objectGUID=%s)", tmp_str);
1027         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1028         talloc_free(tmp_str);
1029
1030         ret = ldb_build_search_req(&ar->sub.search_req,
1031                                    ar->module->ldb,
1032                                    ar->sub.mem_ctx,
1033                                    ar->objs->partition_dn,
1034                                    LDB_SCOPE_SUBTREE,
1035                                    filter,
1036                                    NULL,
1037                                    NULL,
1038                                    ar,
1039                                    replmd_replicated_apply_search_callback);
1040         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1041
1042 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1043         return ldb_next_request(ar->module, ar->sub.search_req);
1044 #else
1045         ret = ldb_next_request(ar->module, ar->sub.search_req);
1046         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1047
1048         ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1049         if (ar->sub.search_ret != LDB_SUCCESS && ar->sub.search_ret != LDB_ERR_NO_SUCH_OBJECT) {
1050                 return replmd_replicated_request_error(ar, ar->sub.search_ret);
1051         }
1052         if (ar->sub.search_msg) {
1053                 return replmd_replicated_apply_merge(ar);
1054         }
1055
1056         return replmd_replicated_apply_add(ar);
1057 #endif
1058 }
1059
1060 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
1061 {
1062 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1063         if (ar->index_current >= ar->objs->num_objects) {
1064                 return replmd_replicated_uptodate_vector(ar);
1065         }
1066 #endif
1067
1068         ar->sub.mem_ctx = talloc_new(ar);
1069         if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1070
1071         return replmd_replicated_apply_search(ar);
1072 }
1073
1074 static int replmd_replicated_uptodate_modify_callback(struct ldb_context *ldb,
1075                                                       void *private_data,
1076                                                       struct ldb_reply *ares)
1077 {
1078 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1079         struct replmd_replicated_request *ar = talloc_get_type(private_data,
1080                                                struct replmd_replicated_request);
1081
1082         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1083         if (ar->sub.change_ret != LDB_SUCCESS) {
1084                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
1085         }
1086
1087         talloc_free(ar->sub.mem_ctx);
1088         ZERO_STRUCT(ar->sub);
1089
1090         return replmd_replicated_request_done(ar);
1091 #else
1092         return LDB_SUCCESS;
1093 #endif
1094 }
1095
1096 static int replmd_drsuapi_DsReplicaCursor2_compare(const struct drsuapi_DsReplicaCursor2 *c1,
1097                                                    const struct drsuapi_DsReplicaCursor2 *c2)
1098 {
1099         return GUID_compare(&c1->source_dsa_invocation_id, &c2->source_dsa_invocation_id);
1100 }
1101
1102 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1103 {
1104         enum ndr_err_code ndr_err;
1105         struct ldb_message *msg;
1106         struct replUpToDateVectorBlob ouv;
1107         const struct ldb_val *ouv_value;
1108         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1109         struct replUpToDateVectorBlob nuv;
1110         struct ldb_val nuv_value;
1111         struct ldb_message_element *nuv_el = NULL;
1112         const struct GUID *our_invocation_id;
1113         struct ldb_message_element *orf_el = NULL;
1114         struct repsFromToBlob nrf;
1115         struct ldb_val *nrf_value = NULL;
1116         struct ldb_message_element *nrf_el = NULL;
1117         uint32_t i,j,ni=0;
1118         uint64_t seq_num;
1119         bool found = false;
1120         time_t t = time(NULL);
1121         NTTIME now;
1122         int ret;
1123
1124         ruv = ar->objs->uptodateness_vector;
1125         ZERO_STRUCT(ouv);
1126         ouv.version = 2;
1127         ZERO_STRUCT(nuv);
1128         nuv.version = 2;
1129
1130         unix_to_nt_time(&now, t);
1131
1132         /* 
1133          * we use the next sequence number for our own highest_usn
1134          * because we will do a modify request and this will increment
1135          * our highest_usn
1136          */
1137         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
1138         if (ret != LDB_SUCCESS) {
1139                 return replmd_replicated_request_error(ar, ret);
1140         }
1141
1142         /*
1143          * first create the new replUpToDateVector
1144          */
1145         ouv_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replUpToDateVector");
1146         if (ouv_value) {
1147                 ndr_err = ndr_pull_struct_blob(ouv_value, ar->sub.mem_ctx, 
1148                                                lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")), &ouv,
1149                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1150                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1151                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1152                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1153                 }
1154
1155                 if (ouv.version != 2) {
1156                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1157                 }
1158         }
1159
1160         /*
1161          * the new uptodateness vector will at least
1162          * contain 1 entry, one for the source_dsa
1163          *
1164          * plus optional values from our old vector and the one from the source_dsa
1165          */
1166         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
1167         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1168         nuv.ctr.ctr2.cursors = talloc_array(ar->sub.mem_ctx,
1169                                             struct drsuapi_DsReplicaCursor2,
1170                                             nuv.ctr.ctr2.count);
1171         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1172
1173         /* first copy the old vector */
1174         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1175                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1176                 ni++;
1177         }
1178
1179         /* get our invocation_id if we have one already attached to the ldb */
1180         our_invocation_id = samdb_ntds_invocation_id(ar->module->ldb);
1181
1182         /* merge in the source_dsa vector is available */
1183         for (i=0; (ruv && i < ruv->count); i++) {
1184                 found = false;
1185
1186                 if (our_invocation_id &&
1187                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1188                                our_invocation_id)) {
1189                         continue;
1190                 }
1191
1192                 for (j=0; j < ni; j++) {
1193                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1194                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1195                                 continue;
1196                         }
1197
1198                         found = true;
1199
1200                         /*
1201                          * we update only the highest_usn and not the latest_sync_success time,
1202                          * because the last success stands for direct replication
1203                          */
1204                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1205                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1206                         }
1207                         break;                  
1208                 }
1209
1210                 if (found) continue;
1211
1212                 /* if it's not there yet, add it */
1213                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1214                 ni++;
1215         }
1216
1217         /*
1218          * merge in the current highwatermark for the source_dsa
1219          */
1220         found = false;
1221         for (j=0; j < ni; j++) {
1222                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1223                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1224                         continue;
1225                 }
1226
1227                 found = true;
1228
1229                 /*
1230                  * here we update the highest_usn and last_sync_success time
1231                  * because we're directly replicating from the source_dsa
1232                  *
1233                  * and use the tmp_highest_usn because this is what we have just applied
1234                  * to our ldb
1235                  */
1236                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1237                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1238                 break;
1239         }
1240         if (!found) {
1241                 /*
1242                  * here we update the highest_usn and last_sync_success time
1243                  * because we're directly replicating from the source_dsa
1244                  *
1245                  * and use the tmp_highest_usn because this is what we have just applied
1246                  * to our ldb
1247                  */
1248                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1249                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1250                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1251                 ni++;
1252         }
1253
1254         /*
1255          * finally correct the size of the cursors array
1256          */
1257         nuv.ctr.ctr2.count = ni;
1258
1259         /*
1260          * sort the cursors
1261          */
1262         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1263               sizeof(struct drsuapi_DsReplicaCursor2),
1264               (comparison_fn_t)replmd_drsuapi_DsReplicaCursor2_compare);
1265
1266         /*
1267          * create the change ldb_message
1268          */
1269         msg = ldb_msg_new(ar->sub.mem_ctx);
1270         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1271         msg->dn = ar->sub.search_msg->dn;
1272
1273         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
1274                                        lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")), 
1275                                        &nuv,
1276                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1277         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1278                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1279                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1280         }
1281         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1282         if (ret != LDB_SUCCESS) {
1283                 return replmd_replicated_request_error(ar, ret);
1284         }
1285         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1286
1287         /*
1288          * now create the new repsFrom value from the given repsFromTo1 structure
1289          */
1290         ZERO_STRUCT(nrf);
1291         nrf.version                                     = 1;
1292         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1293         /* and fix some values... */
1294         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1295         nrf.ctr.ctr1.last_success                       = now;
1296         nrf.ctr.ctr1.last_attempt                       = now;
1297         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1298         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1299
1300         /*
1301          * first see if we already have a repsFrom value for the current source dsa
1302          * if so we'll later replace this value
1303          */
1304         orf_el = ldb_msg_find_element(ar->sub.search_msg, "repsFrom");
1305         if (orf_el) {
1306                 for (i=0; i < orf_el->num_values; i++) {
1307                         struct repsFromToBlob *trf;
1308
1309                         trf = talloc(ar->sub.mem_ctx, struct repsFromToBlob);
1310                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1311
1312                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")), trf,
1313                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1314                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1315                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1316                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1317                         }
1318
1319                         if (trf->version != 1) {
1320                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1321                         }
1322
1323                         /*
1324                          * we compare the source dsa objectGUID not the invocation_id
1325                          * because we want only one repsFrom value per source dsa
1326                          * and when the invocation_id of the source dsa has changed we don't need 
1327                          * the old repsFrom with the old invocation_id
1328                          */
1329                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1330                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1331                                 talloc_free(trf);
1332                                 continue;
1333                         }
1334
1335                         talloc_free(trf);
1336                         nrf_value = &orf_el->values[i];
1337                         break;
1338                 }
1339
1340                 /*
1341                  * copy over all old values to the new ldb_message
1342                  */
1343                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1344                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1345                 *nrf_el = *orf_el;
1346         }
1347
1348         /*
1349          * if we haven't found an old repsFrom value for the current source dsa
1350          * we'll add a new value
1351          */
1352         if (!nrf_value) {
1353                 struct ldb_val zero_value;
1354                 ZERO_STRUCT(zero_value);
1355                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1356                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1357
1358                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1359         }
1360
1361         /* we now fill the value which is already attached to ldb_message */
1362         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
1363                                        lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")),
1364                                        &nrf,
1365                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1366         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1367                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1368                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1369         }
1370
1371         /* 
1372          * the ldb_message_element for the attribute, has all the old values and the new one
1373          * so we'll replace the whole attribute with all values
1374          */
1375         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1376
1377         /* prepare the ldb_modify() request */
1378         ret = ldb_build_mod_req(&ar->sub.change_req,
1379                                 ar->module->ldb,
1380                                 ar->sub.mem_ctx,
1381                                 msg,
1382                                 NULL,
1383                                 ar,
1384                                 replmd_replicated_uptodate_modify_callback);
1385         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1386
1387 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1388         return ldb_next_request(ar->module, ar->sub.change_req);
1389 #else
1390         ret = ldb_next_request(ar->module, ar->sub.change_req);
1391         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1392
1393         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1394         if (ar->sub.change_ret != LDB_SUCCESS) {
1395                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
1396         }
1397
1398         talloc_free(ar->sub.mem_ctx);
1399         ZERO_STRUCT(ar->sub);
1400
1401         return replmd_replicated_request_done(ar);
1402 #endif
1403 }
1404
1405 static int replmd_replicated_uptodate_search_callback(struct ldb_context *ldb,
1406                                                       void *private_data,
1407                                                       struct ldb_reply *ares)
1408 {
1409         struct replmd_replicated_request *ar = talloc_get_type(private_data,
1410                                                struct replmd_replicated_request);
1411         bool is_done = false;
1412
1413         switch (ares->type) {
1414         case LDB_REPLY_ENTRY:
1415                 ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
1416                 break;
1417         case LDB_REPLY_REFERRAL:
1418                 /* we ignore referrals */
1419                 break;
1420         case LDB_REPLY_EXTENDED:
1421         case LDB_REPLY_DONE:
1422                 is_done = true;
1423         }
1424
1425         talloc_free(ares);
1426
1427 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1428         if (is_done) {
1429                 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1430                 if (ar->sub.search_ret != LDB_SUCCESS) {
1431                         return replmd_replicated_request_error(ar, ar->sub.search_ret);
1432                 }
1433                 if (!ar->sub.search_msg) {
1434                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1435                 }
1436
1437                 return replmd_replicated_uptodate_modify(ar);
1438         }
1439 #endif
1440         return LDB_SUCCESS;
1441 }
1442
1443 static int replmd_replicated_uptodate_search(struct replmd_replicated_request *ar)
1444 {
1445         int ret;
1446         static const char *attrs[] = {
1447                 "replUpToDateVector",
1448                 "repsFrom",
1449                 NULL
1450         };
1451
1452         ret = ldb_build_search_req(&ar->sub.search_req,
1453                                    ar->module->ldb,
1454                                    ar->sub.mem_ctx,
1455                                    ar->objs->partition_dn,
1456                                    LDB_SCOPE_BASE,
1457                                    "(objectClass=*)",
1458                                    attrs,
1459                                    NULL,
1460                                    ar,
1461                                    replmd_replicated_uptodate_search_callback);
1462         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1463
1464 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1465         return ldb_next_request(ar->module, ar->sub.search_req);
1466 #else
1467         ret = ldb_next_request(ar->module, ar->sub.search_req);
1468         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1469
1470         ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1471         if (ar->sub.search_ret != LDB_SUCCESS) {
1472                 return replmd_replicated_request_error(ar, ar->sub.search_ret);
1473         }
1474         if (!ar->sub.search_msg) {
1475                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1476         }
1477
1478         return replmd_replicated_uptodate_modify(ar);
1479 #endif
1480 }
1481
1482 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1483 {
1484         ar->sub.mem_ctx = talloc_new(ar);
1485         if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1486
1487         return replmd_replicated_uptodate_search(ar);
1488 }
1489
1490 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1491 {
1492         struct dsdb_extended_replicated_objects *objs;
1493         struct replmd_replicated_request *ar;
1494
1495         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1496
1497         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1498         if (!objs) {
1499                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1500                 return LDB_ERR_PROTOCOL_ERROR;
1501         }
1502
1503         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1504                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1505                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1506                 return LDB_ERR_PROTOCOL_ERROR;
1507         }
1508
1509         ar = replmd_replicated_init_handle(module, req, objs);
1510         if (!ar) {
1511                 return LDB_ERR_OPERATIONS_ERROR;
1512         }
1513
1514 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1515         return replmd_replicated_apply_next(ar);
1516 #else
1517         while (ar->index_current < ar->objs->num_objects &&
1518                req->handle->state != LDB_ASYNC_DONE) { 
1519                 replmd_replicated_apply_next(ar);
1520         }
1521
1522         if (req->handle->state != LDB_ASYNC_DONE) {
1523                 replmd_replicated_uptodate_vector(ar);
1524         }
1525
1526         return LDB_SUCCESS;
1527 #endif
1528 }
1529
1530 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
1531 {
1532         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
1533                 return replmd_extended_replicated_objects(module, req);
1534         }
1535
1536         return ldb_next_request(module, req);
1537 }
1538
1539 static int replmd_wait_none(struct ldb_handle *handle) {
1540         struct replmd_replicated_request *ar;
1541     
1542         if (!handle || !handle->private_data) {
1543                 return LDB_ERR_OPERATIONS_ERROR;
1544         }
1545
1546         ar = talloc_get_type(handle->private_data, struct replmd_replicated_request);
1547         if (!ar) {
1548                 return LDB_ERR_OPERATIONS_ERROR;
1549         }
1550
1551         /* we do only sync calls */
1552         if (handle->state != LDB_ASYNC_DONE) {
1553                 return LDB_ERR_OPERATIONS_ERROR;
1554         }
1555
1556         return handle->status;
1557 }
1558
1559 static int replmd_wait_all(struct ldb_handle *handle) {
1560
1561         int ret;
1562
1563         while (handle->state != LDB_ASYNC_DONE) {
1564                 ret = replmd_wait_none(handle);
1565                 if (ret != LDB_SUCCESS) {
1566                         return ret;
1567                 }
1568         }
1569
1570         return handle->status;
1571 }
1572
1573 static int replmd_wait(struct ldb_handle *handle, enum ldb_wait_type type)
1574 {
1575         if (type == LDB_WAIT_ALL) {
1576                 return replmd_wait_all(handle);
1577         } else {
1578                 return replmd_wait_none(handle);
1579         }
1580 }
1581
1582 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
1583         .name          = "repl_meta_data",
1584         .add           = replmd_add,
1585         .modify        = replmd_modify,
1586         .extended      = replmd_extended,
1587         .wait          = replmd_wait
1588 };