r20975: - implement handling of meta data an on originating add
[samba.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2006
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher 2007
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12    
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 2 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, write to the Free Software
25    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 */
27
28 /*
29  *  Name: ldb
30  *
31  *  Component: ldb repl_meta_data module
32  *
33  *  Description: - add a unique objectGUID onto every new record,
34  *               - handle whenCreated, whenChanged timestamps
35  *               - handle uSNCreated, uSNChanged numbers
36  *               - handle replPropertyMetaData attribute
37  *
38  *  Author: Simo Sorce
39  *  Author: Stefan Metzmacher
40  */
41
42 #include "includes.h"
43 #include "lib/ldb/include/ldb.h"
44 #include "lib/ldb/include/ldb_errors.h"
45 #include "lib/ldb/include/ldb_private.h"
46 #include "dsdb/samdb/samdb.h"
47 #include "dsdb/common/flags.h"
48 #include "librpc/gen_ndr/ndr_misc.h"
49 #include "librpc/gen_ndr/ndr_drsuapi.h"
50 #include "librpc/gen_ndr/ndr_drsblobs.h"
51
52 struct replmd_replicated_request {
53         struct ldb_module *module;
54         struct ldb_handle *handle;
55         struct ldb_request *orig_req;
56
57         const struct dsdb_schema *schema;
58
59         struct dsdb_extended_replicated_objects *objs;
60
61         uint32_t index_current;
62
63         struct {
64                 TALLOC_CTX *mem_ctx;
65                 struct ldb_request *search_req;
66                 struct ldb_message *search_msg;
67                 int search_ret;
68                 struct ldb_request *change_req;
69                 int change_ret;
70         } sub;
71 };
72
73 static struct replmd_replicated_request *replmd_replicated_init_handle(struct ldb_module *module,
74                                                                        struct ldb_request *req,
75                                                                        struct dsdb_extended_replicated_objects *objs)
76 {
77         struct replmd_replicated_request *ar;
78         struct ldb_handle *h;
79         const struct dsdb_schema *schema;
80
81         schema = dsdb_get_schema(module->ldb);
82         if (!schema) {
83                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
84                               "replmd_replicated_init_handle: no loaded schema found\n");
85                 return NULL;
86         }
87
88         h = talloc_zero(req, struct ldb_handle);
89         if (h == NULL) {
90                 ldb_set_errstring(module->ldb, "Out of Memory");
91                 return NULL;
92         }
93
94         h->module       = module;
95         h->state        = LDB_ASYNC_PENDING;
96         h->status       = LDB_SUCCESS;
97
98         ar = talloc_zero(h, struct replmd_replicated_request);
99         if (ar == NULL) {
100                 ldb_set_errstring(module->ldb, "Out of Memory");
101                 talloc_free(h);
102                 return NULL;
103         }
104
105         h->private_data = ar;
106
107         ar->module      = module;
108         ar->handle      = h;
109         ar->orig_req    = req;
110         ar->schema      = schema;
111         ar->objs        = objs;
112
113         req->handle = h;
114
115         return ar;
116 }
117
118 /*
119   add a time element to a record
120 */
121 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
122 {
123         struct ldb_message_element *el;
124         char *s;
125
126         if (ldb_msg_find_element(msg, attr) != NULL) {
127                 return 0;
128         }
129
130         s = ldb_timestring(msg, t);
131         if (s == NULL) {
132                 return -1;
133         }
134
135         if (ldb_msg_add_string(msg, attr, s) != 0) {
136                 return -1;
137         }
138
139         el = ldb_msg_find_element(msg, attr);
140         /* always set as replace. This works because on add ops, the flag
141            is ignored */
142         el->flags = LDB_FLAG_MOD_REPLACE;
143
144         return 0;
145 }
146
147 /*
148   add a uint64_t element to a record
149 */
150 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
151 {
152         struct ldb_message_element *el;
153
154         if (ldb_msg_find_element(msg, attr) != NULL) {
155                 return 0;
156         }
157
158         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != 0) {
159                 return -1;
160         }
161
162         el = ldb_msg_find_element(msg, attr);
163         /* always set as replace. This works because on add ops, the flag
164            is ignored */
165         el->flags = LDB_FLAG_MOD_REPLACE;
166
167         return 0;
168 }
169
170 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
171                                                    const struct replPropertyMetaData1 *m2,
172                                                    const uint32_t *rdn_attid)
173 {
174         if (m1->attid == m2->attid) {
175                 return 0;
176         }
177
178         /*
179          * the rdn attribute should be at the end!
180          * so we need to return a value greater than zero
181          * which means m1 is greater than m2
182          */
183         if (m1->attid == *rdn_attid) {
184                 return 1;
185         }
186
187         /*
188          * the rdn attribute should be at the end!
189          * so we need to return a value less than zero
190          * which means m2 is greater than m1
191          */
192         if (m2->attid == *rdn_attid) {
193                 return -1;
194         }
195
196         return m1->attid - m2->attid;
197 }
198
199 static void replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
200                                                  const uint32_t *rdn_attid)
201 {
202         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
203                   discard_const_p(void, rdn_attid), (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
204 }
205
206 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
207                                                  const struct ldb_message_element *e2,
208                                                  const struct dsdb_schema *schema)
209 {
210         const struct dsdb_attribute *a1;
211         const struct dsdb_attribute *a2;
212
213         /* 
214          * TODO: make this faster by caching the dsdb_attribute pointer
215          *       on the ldb_messag_element
216          */
217
218         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
219         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
220
221         /*
222          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
223          *       in the schema
224          */
225         if (!a1 || !a2) {
226                 return strcasecmp(e1->name, e2->name);
227         }
228
229         return a1->attributeID_id - a2->attributeID_id;
230 }
231
232 static void replmd_ldb_message_sort(struct ldb_message *msg,
233                                     const struct dsdb_schema *schema)
234 {
235         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
236                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
237 }
238
239 static int replmd_prepare_originating(struct ldb_module *module, struct ldb_request *req,
240                                       struct ldb_dn *dn, const char *fn_name,
241                                       int (*fn)(struct ldb_module *,
242                                                 struct ldb_request *,
243                                                 const struct dsdb_schema *,
244                                                 const struct dsdb_control_current_partition *))
245 {
246         const struct dsdb_schema *schema;
247         const struct ldb_control *partition_ctrl;
248         const struct dsdb_control_current_partition *partition;
249  
250         /* do not manipulate our control entries */
251         if (ldb_dn_is_special(dn)) {
252                 return ldb_next_request(module, req);
253         }
254
255         schema = dsdb_get_schema(module->ldb);
256         if (!schema) {
257                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
258                               "%s: no dsdb_schema loaded",
259                               fn_name);
260                 return LDB_ERR_CONSTRAINT_VIOLATION;
261         }
262
263         partition_ctrl = get_control_from_list(req->controls, DSDB_CONTROL_CURRENT_PARTITION_OID);
264         if (!partition_ctrl) {
265                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
266                               "%s: no current partition control found",
267                               fn_name);
268                 return LDB_ERR_CONSTRAINT_VIOLATION;
269         }
270
271         partition = talloc_get_type(partition_ctrl->data,
272                                     struct dsdb_control_current_partition);
273         if (!partition) {
274                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
275                               "%s: current partition control contains invalid data",
276                               fn_name);
277                 return LDB_ERR_CONSTRAINT_VIOLATION;
278         }
279
280         if (partition->version != DSDB_CONTROL_CURRENT_PARTITION_VERSION) {
281                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
282                               "%s: current partition control contains invalid version [%u != %u]\n",
283                               fn_name, partition->version, DSDB_CONTROL_CURRENT_PARTITION_VERSION);
284                 return LDB_ERR_CONSTRAINT_VIOLATION;
285         }
286
287         return fn(module, req, schema, partition);
288 }
289
290 static int replmd_add_originating(struct ldb_module *module,
291                                   struct ldb_request *req,
292                                   const struct dsdb_schema *schema,
293                                   const struct dsdb_control_current_partition *partition)
294 {
295         NTSTATUS nt_status;
296         struct ldb_request *down_req;
297         struct ldb_message *msg;
298         uint32_t instance_type;
299         struct ldb_dn *new_dn;
300         const char *rdn_name;
301         const char *rdn_name_upper;
302         const struct ldb_val *rdn_value = NULL;
303         const struct dsdb_attribute *rdn_attr = NULL;
304         struct GUID guid;
305         struct ldb_val guid_value;
306         struct replPropertyMetaDataBlob nmd;
307         struct ldb_val nmd_value;
308         uint64_t seq_num;
309         const struct GUID *our_invocation_id;
310         time_t t = time(NULL);
311         NTTIME now;
312         char *time_str;
313         int ret;
314         uint32_t i, ni=0;
315
316         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_add_originating\n");
317
318         if (ldb_msg_find_element(req->op.add.message, "objectGUID")) {
319                 ldb_debug_set(module->ldb, LDB_DEBUG_ERROR,
320                               "replmd_add_originating: it's not allowed to add an object with objectGUID\n");
321                 return LDB_ERR_UNWILLING_TO_PERFORM;
322         }
323
324         if (ldb_msg_find_element(req->op.add.message, "instanceType")) {
325                 ldb_debug_set(module->ldb, LDB_DEBUG_ERROR,
326                               "replmd_add_originating: it's not allowed to add an object with instanceType\n");
327                 return LDB_ERR_UNWILLING_TO_PERFORM;
328         }
329
330         /* Get a sequence number from the backend */
331         ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
332         if (ret != LDB_SUCCESS) {
333                 return ret;
334         }
335
336         /* a new GUID */
337         guid = GUID_random();
338
339         /* get our invicationId */
340         our_invocation_id = samdb_ntds_invocation_id(module->ldb);
341         if (!our_invocation_id) {
342                 ldb_debug_set(module->ldb, LDB_DEBUG_ERROR,
343                               "replmd_add_originating: unable to find invocationId\n");
344                 return LDB_ERR_OPERATIONS_ERROR;
345         }
346
347         /* create a copy of the request */
348         down_req = talloc(req, struct ldb_request);
349         if (down_req == NULL) {
350                 ldb_oom(module->ldb);
351                 return LDB_ERR_OPERATIONS_ERROR;
352         }
353         *down_req = *req;
354
355         /* we have to copy the message as the caller might have it as a const */
356         down_req->op.add.message = msg = ldb_msg_copy_shallow(down_req, req->op.add.message);
357         if (msg == NULL) {
358                 talloc_free(down_req);
359                 ldb_oom(module->ldb);
360                 return LDB_ERR_OPERATIONS_ERROR;
361         }
362
363         /* generated times */
364         unix_to_nt_time(&now, t);
365         time_str = ldb_timestring(msg, t);
366         if (!time_str) {
367                 talloc_free(down_req);
368                 return LDB_ERR_OPERATIONS_ERROR;
369         }
370
371         /*
372          * get details of the rdn name
373          */
374         rdn_name        = ldb_dn_get_rdn_name(msg->dn);
375         if (!rdn_name) {
376                 talloc_free(down_req);
377                 ldb_oom(module->ldb);
378                 return LDB_ERR_OPERATIONS_ERROR;
379         }
380         rdn_attr        = dsdb_attribute_by_lDAPDisplayName(schema, rdn_name);
381         if (!rdn_attr) {
382                 talloc_free(down_req);
383                 return LDB_ERR_OPERATIONS_ERROR;
384         }
385         rdn_value       = ldb_dn_get_rdn_val(msg->dn);
386         if (!rdn_value) {
387                 talloc_free(down_req);
388                 ldb_oom(module->ldb);
389                 return LDB_ERR_OPERATIONS_ERROR;
390         }
391
392         /* 
393          * remove autogenerated attributes
394          */
395         ldb_msg_remove_attr(msg, rdn_name);
396         ldb_msg_remove_attr(msg, "name");
397         ldb_msg_remove_attr(msg, "whenCreated");
398         ldb_msg_remove_attr(msg, "whenChanged");
399         ldb_msg_remove_attr(msg, "uSNCreated");
400         ldb_msg_remove_attr(msg, "uSNChanged");
401         ldb_msg_remove_attr(msg, "replPropertyMetaData");
402
403         /*
404          * TODO: construct a new DN out of:
405          *       - the parent DN
406          *       - the upper case of rdn_attr->LDAPDisplayName
407          *       - rdn_value
408          */
409         new_dn = ldb_dn_copy(msg, msg->dn);
410         if (!new_dn) {
411                 talloc_free(down_req);
412                 ldb_oom(module->ldb);
413                 return LDB_ERR_OPERATIONS_ERROR;
414         }
415         rdn_name_upper = strupper_talloc(msg, rdn_attr->lDAPDisplayName);
416         if (!rdn_name_upper) {
417                 talloc_free(down_req);
418                 ldb_oom(module->ldb);
419                 return LDB_ERR_OPERATIONS_ERROR;
420         }
421         ret = ldb_dn_set_component(new_dn, 0, rdn_name_upper, *rdn_value);
422         if (ret != LDB_SUCCESS) {
423                 talloc_free(down_req);
424                 ldb_oom(module->ldb);
425                 return LDB_ERR_OPERATIONS_ERROR;
426         }
427         msg->dn = new_dn;
428
429         /*
430          * TODO: calculate correct instance type
431          */
432         instance_type = INSTANCE_TYPE_WRITE;
433         if (ldb_dn_compare(partition->dn, msg->dn) == 0) {
434                 instance_type |= INSTANCE_TYPE_IS_NC_HEAD;
435                 if (ldb_dn_compare(msg->dn, samdb_base_dn(module->ldb)) != 0) {
436                         instance_type |= INSTANCE_TYPE_NC_ABOVE;
437                 }
438         }
439
440         /*
441          * readd replicated attributes
442          */
443         ret = ldb_msg_add_value(msg, rdn_attr->lDAPDisplayName, rdn_value, NULL);
444         if (ret != LDB_SUCCESS) {
445                 talloc_free(down_req);
446                 ldb_oom(module->ldb);
447                 return LDB_ERR_OPERATIONS_ERROR;
448         }
449         ret = ldb_msg_add_value(msg, "name", rdn_value, NULL);
450         if (ret != LDB_SUCCESS) {
451                 talloc_free(down_req);
452                 ldb_oom(module->ldb);
453                 return LDB_ERR_OPERATIONS_ERROR;
454         }
455         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
456         if (ret != LDB_SUCCESS) {
457                 talloc_free(down_req);
458                 ldb_oom(module->ldb);
459                 return LDB_ERR_OPERATIONS_ERROR;
460         }
461         ret = ldb_msg_add_fmt(msg, "instanceType", "%u", instance_type);
462         if (ret != LDB_SUCCESS) {
463                 talloc_free(down_req);
464                 ldb_oom(module->ldb);
465                 return LDB_ERR_OPERATIONS_ERROR;
466         }
467
468         /* build the replication meta_data */
469         ZERO_STRUCT(nmd);
470         nmd.version             = 1;
471         nmd.ctr.ctr1.count      = msg->num_elements;
472         nmd.ctr.ctr1.array      = talloc_array(msg,
473                                                struct replPropertyMetaData1,
474                                                nmd.ctr.ctr1.count);
475         if (!nmd.ctr.ctr1.array) {
476                 talloc_free(down_req);
477                 ldb_oom(module->ldb);
478                 return LDB_ERR_OPERATIONS_ERROR;
479         }
480
481         for (i=0; i < msg->num_elements; i++) {
482                 struct ldb_message_element *e = &msg->elements[i];
483                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
484                 const struct dsdb_attribute *sa;
485
486                 sa = dsdb_attribute_by_lDAPDisplayName(schema, e->name);
487                 if (!sa) {
488                         ldb_debug_set(module->ldb, LDB_DEBUG_ERROR,
489                                       "replmd_add_originating: attribute '%s' not defined in schema\n",
490                                       e->name);
491                         talloc_free(down_req);
492                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
493                 }
494
495                 if (sa->systemFlags & 0x00000001) {
496                         /* attribute is not replicated so it has no meta data */
497                         continue;
498                 }
499
500                 m->attid                        = sa->attributeID_id;
501                 m->version                      = 1;
502                 m->orginating_time              = now;
503                 m->orginating_invocation_id     = *our_invocation_id;
504                 m->orginating_usn               = seq_num;
505                 m->local_usn                    = seq_num;
506                 ni++;
507         }
508
509         /* fix meta data count */
510         nmd.ctr.ctr1.count = ni;
511
512         /*
513          * sort meta data array, and move the rdn attribute entry to the end
514          */
515         replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_attr->attributeID_id);
516
517         /* generated NDR encoded values */
518         nt_status = ndr_push_struct_blob(&guid_value, msg, &guid, 
519                                          (ndr_push_flags_fn_t)ndr_push_GUID);
520         if (!NT_STATUS_IS_OK(nt_status)) {
521                 talloc_free(down_req);
522                 ldb_oom(module->ldb);
523                 return LDB_ERR_OPERATIONS_ERROR;
524         }
525         nt_status = ndr_push_struct_blob(&nmd_value, msg, &nmd,
526                                          (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
527         if (!NT_STATUS_IS_OK(nt_status)) {
528                 talloc_free(down_req);
529                 ldb_oom(module->ldb);
530                 return LDB_ERR_OPERATIONS_ERROR;
531         }
532
533         /*
534          * add the autogenerated values
535          */
536         ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
537         if (ret != LDB_SUCCESS) {
538                 talloc_free(down_req);
539                 ldb_oom(module->ldb);
540                 return LDB_ERR_OPERATIONS_ERROR;
541         }
542         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
543         if (ret != LDB_SUCCESS) {
544                 talloc_free(down_req);
545                 ldb_oom(module->ldb);
546                 return LDB_ERR_OPERATIONS_ERROR;
547         }
548         ret = samdb_msg_add_uint64(module->ldb, msg, msg, "uSNCreated", seq_num);
549         if (ret != LDB_SUCCESS) {
550                 talloc_free(down_req);
551                 ldb_oom(module->ldb);
552                 return LDB_ERR_OPERATIONS_ERROR;
553         }
554         ret = samdb_msg_add_uint64(module->ldb, msg, msg, "uSNChanged", seq_num);
555         if (ret != LDB_SUCCESS) {
556                 talloc_free(down_req);
557                 ldb_oom(module->ldb);
558                 return LDB_ERR_OPERATIONS_ERROR;
559         }
560         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
561         if (ret != LDB_SUCCESS) {
562                 talloc_free(down_req);
563                 ldb_oom(module->ldb);
564                 return LDB_ERR_OPERATIONS_ERROR;
565         }
566
567         /*
568          * sort the attributes by attid before storing the object
569          */
570         replmd_ldb_message_sort(msg, schema);
571
572         ldb_set_timeout_from_prev_req(module->ldb, req, down_req);
573
574         /* go on with the call chain */
575         ret = ldb_next_request(module, down_req);
576
577         /* do not free down_req as the call results may be linked to it,
578          * it will be freed when the upper level request get freed */
579         if (ret == LDB_SUCCESS) {
580                 req->handle = down_req->handle;
581         }
582
583         return ret;
584 }
585
586 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
587 {
588         return replmd_prepare_originating(module, req, req->op.add.message->dn,
589                                           "replmd_add", replmd_add_originating);
590 }
591
592 static int replmd_modify_originating(struct ldb_module *module,
593                                      struct ldb_request *req,
594                                      const struct dsdb_schema *schema,
595                                      const struct dsdb_control_current_partition *partition)
596 {
597         struct ldb_request *down_req;
598         struct ldb_message *msg;
599         int ret;
600         time_t t = time(NULL);
601         uint64_t seq_num;
602
603         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_modify_originating\n");
604
605         down_req = talloc(req, struct ldb_request);
606         if (down_req == NULL) {
607                 return LDB_ERR_OPERATIONS_ERROR;
608         }
609
610         *down_req = *req;
611
612         /* we have to copy the message as the caller might have it as a const */
613         down_req->op.mod.message = msg = ldb_msg_copy_shallow(down_req, req->op.mod.message);
614         if (msg == NULL) {
615                 talloc_free(down_req);
616                 return LDB_ERR_OPERATIONS_ERROR;
617         }
618
619         if (add_time_element(msg, "whenChanged", t) != 0) {
620                 talloc_free(down_req);
621                 return LDB_ERR_OPERATIONS_ERROR;
622         }
623
624         /* Get a sequence number from the backend */
625         ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
626         if (ret == LDB_SUCCESS) {
627                 if (add_uint64_element(msg, "uSNChanged", seq_num) != 0) {
628                         talloc_free(down_req);
629                         return LDB_ERR_OPERATIONS_ERROR;
630                 }
631         }
632
633         ldb_set_timeout_from_prev_req(module->ldb, req, down_req);
634
635         /* go on with the call chain */
636         ret = ldb_next_request(module, down_req);
637
638         /* do not free down_req as the call results may be linked to it,
639          * it will be freed when the upper level request get freed */
640         if (ret == LDB_SUCCESS) {
641                 req->handle = down_req->handle;
642         }
643
644         return ret;
645 }
646
647 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
648 {
649         return replmd_prepare_originating(module, req, req->op.mod.message->dn,
650                                           "replmd_modify", replmd_modify_originating);
651 }
652
653 static int replmd_replicated_request_reply_helper(struct replmd_replicated_request *ar, int ret)
654 {
655         struct ldb_reply *ares = NULL;
656
657         ar->handle->status = ret;
658         ar->handle->state = LDB_ASYNC_DONE;
659
660         if (!ar->orig_req->callback) {
661                 return LDB_SUCCESS;
662         }
663         
664         /* we're done and need to report the success to the caller */
665         ares = talloc_zero(ar, struct ldb_reply);
666         if (!ares) {
667                 ar->handle->status = LDB_ERR_OPERATIONS_ERROR;
668                 ar->handle->state = LDB_ASYNC_DONE;
669                 return LDB_ERR_OPERATIONS_ERROR;
670         }
671
672         ares->type      = LDB_REPLY_EXTENDED;
673         ares->response  = NULL;
674
675         return ar->orig_req->callback(ar->module->ldb, ar->orig_req->context, ares);
676 }
677
678 static int replmd_replicated_request_done(struct replmd_replicated_request *ar)
679 {
680         return replmd_replicated_request_reply_helper(ar, LDB_SUCCESS);
681 }
682
683 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
684 {
685         return replmd_replicated_request_reply_helper(ar, ret);
686 }
687
688 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
689 {
690         int ret = LDB_ERR_OTHER;
691         /* TODO: do some error mapping */
692         return replmd_replicated_request_reply_helper(ar, ret);
693 }
694
695 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
696
697 static int replmd_replicated_apply_add_callback(struct ldb_context *ldb,
698                                                 void *private_data,
699                                                 struct ldb_reply *ares)
700 {
701 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
702         struct replmd_replicated_request *ar = talloc_get_type(private_data,
703                                                struct replmd_replicated_request);
704
705         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
706         if (ar->sub.change_ret != LDB_SUCCESS) {
707                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
708         }
709
710         talloc_free(ar->sub.mem_ctx);
711         ZERO_STRUCT(ar->sub);
712
713         ar->index_current++;
714
715         return replmd_replicated_apply_next(ar);
716 #else
717         return LDB_SUCCESS;
718 #endif
719 }
720
721 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
722 {
723         NTSTATUS nt_status;
724         struct ldb_message *msg;
725         struct replPropertyMetaDataBlob *md;
726         struct ldb_val md_value;
727         uint32_t i;
728         uint64_t seq_num;
729         int ret;
730
731         /*
732          * TODO: check if the parent object exist
733          */
734
735         /*
736          * TODO: handle the conflict case where an object with the
737          *       same name exist
738          */
739
740         msg = ar->objs->objects[ar->index_current].msg;
741         md = ar->objs->objects[ar->index_current].meta_data;
742
743         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
744         if (ret != LDB_SUCCESS) {
745                 return replmd_replicated_request_error(ar, ret);
746         }
747
748         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
749         if (ret != LDB_SUCCESS) {
750                 return replmd_replicated_request_error(ar, ret);
751         }
752
753         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
754         if (ret != LDB_SUCCESS) {
755                 return replmd_replicated_request_error(ar, ret);
756         }
757
758         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNCreated", seq_num);
759         if (ret != LDB_SUCCESS) {
760                 return replmd_replicated_request_error(ar, ret);
761         }
762
763         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
764         if (ret != LDB_SUCCESS) {
765                 return replmd_replicated_request_error(ar, ret);
766         }
767
768         /*
769          * the meta data array is already sorted by the caller
770          */
771         for (i=0; i < md->ctr.ctr1.count; i++) {
772                 md->ctr.ctr1.array[i].local_usn = seq_num;
773         }
774         nt_status = ndr_push_struct_blob(&md_value, msg, md,
775                                          (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
776         if (!NT_STATUS_IS_OK(nt_status)) {
777                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
778         }
779         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
780         if (ret != LDB_SUCCESS) {
781                 return replmd_replicated_request_error(ar, ret);
782         }
783
784         replmd_ldb_message_sort(msg, ar->schema);
785
786         ret = ldb_build_add_req(&ar->sub.change_req,
787                                 ar->module->ldb,
788                                 ar->sub.mem_ctx,
789                                 msg,
790                                 NULL,
791                                 ar,
792                                 replmd_replicated_apply_add_callback);
793         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
794
795 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
796         return ldb_next_request(ar->module, ar->sub.change_req);
797 #else
798         ret = ldb_next_request(ar->module, ar->sub.change_req);
799         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
800
801         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
802         if (ar->sub.change_ret != LDB_SUCCESS) {
803                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
804         }
805
806         talloc_free(ar->sub.mem_ctx);
807         ZERO_STRUCT(ar->sub);
808
809         ar->index_current++;
810
811         return LDB_SUCCESS;
812 #endif
813 }
814
815 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
816                                                          struct replPropertyMetaData1 *m2)
817 {
818         int ret;
819
820         if (m1->version != m2->version) {
821                 return m1->version - m2->version;
822         }
823
824         if (m1->orginating_time != m2->orginating_time) {
825                 return m1->orginating_time - m2->orginating_time;
826         }
827
828         ret = GUID_compare(&m1->orginating_invocation_id, &m2->orginating_invocation_id);
829         if (ret != 0) {
830                 return ret;
831         }
832
833         return m1->orginating_usn - m2->orginating_usn;
834 }
835
836 static int replmd_replicated_apply_merge_callback(struct ldb_context *ldb,
837                                                   void *private_data,
838                                                   struct ldb_reply *ares)
839 {
840 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
841         struct replmd_replicated_request *ar = talloc_get_type(private_data,
842                                                struct replmd_replicated_request);
843
844         ret = ldb_next_request(ar->module, ar->sub.change_req);
845         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
846
847         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
848         if (ar->sub.change_ret != LDB_SUCCESS) {
849                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
850         }
851
852         talloc_free(ar->sub.mem_ctx);
853         ZERO_STRUCT(ar->sub);
854
855         ar->index_current++;
856
857         return LDB_SUCCESS;
858 #else
859         return LDB_SUCCESS;
860 #endif
861 }
862
863 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
864 {
865         NTSTATUS nt_status;
866         struct ldb_message *msg;
867         struct replPropertyMetaDataBlob *rmd;
868         struct replPropertyMetaDataBlob omd;
869         const struct ldb_val *omd_value;
870         struct replPropertyMetaDataBlob nmd;
871         struct ldb_val nmd_value;
872         uint32_t i,j,ni=0;
873         uint32_t removed_attrs = 0;
874         uint64_t seq_num;
875         int ret;
876
877         msg = ar->objs->objects[ar->index_current].msg;
878         rmd = ar->objs->objects[ar->index_current].meta_data;
879         ZERO_STRUCT(omd);
880         omd.version = 1;
881
882         /*
883          * TODO: add rename conflict handling
884          */
885         if (ldb_dn_compare(msg->dn, ar->sub.search_msg->dn) != 0) {
886                 ldb_debug_set(ar->module->ldb, LDB_DEBUG_FATAL, "replmd_replicated_apply_merge[%u]: rename not supported",
887                               ar->index_current);
888                 ldb_debug(ar->module->ldb, LDB_DEBUG_FATAL, "%s => %s\n",
889                           ldb_dn_get_linearized(ar->sub.search_msg->dn),
890                           ldb_dn_get_linearized(msg->dn));
891                 return replmd_replicated_request_werror(ar, WERR_NOT_SUPPORTED);
892         }
893
894         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
895         if (ret != LDB_SUCCESS) {
896                 return replmd_replicated_request_error(ar, ret);
897         }
898
899         /* find existing meta data */
900         omd_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replPropertyMetaData");
901         if (omd_value) {
902                 nt_status = ndr_pull_struct_blob(omd_value, ar->sub.mem_ctx, &omd,
903                                                  (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
904                 if (!NT_STATUS_IS_OK(nt_status)) {
905                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
906                 }
907
908                 if (omd.version != 1) {
909                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
910                 }
911         }
912
913         ZERO_STRUCT(nmd);
914         nmd.version = 1;
915         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
916         nmd.ctr.ctr1.array = talloc_array(ar->sub.mem_ctx,
917                                           struct replPropertyMetaData1,
918                                           nmd.ctr.ctr1.count);
919         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
920
921         /* first copy the old meta data */
922         for (i=0; i < omd.ctr.ctr1.count; i++) {
923                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
924                 ni++;
925         }
926
927         /* now merge in the new meta data */
928         for (i=0; i < rmd->ctr.ctr1.count; i++) {
929                 bool found = false;
930
931                 rmd->ctr.ctr1.array[i].local_usn = seq_num;
932
933                 for (j=0; j < ni; j++) {
934                         int cmp;
935
936                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
937                                 continue;
938                         }
939
940                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
941                                                                             &nmd.ctr.ctr1.array[j]);
942                         if (cmp > 0) {
943                                 /* replace the entry */
944                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
945                                 found = true;
946                                 break;
947                         }
948
949                         /* we don't want to apply this change so remove the attribute */
950                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
951                         removed_attrs++;
952
953                         found = true;
954                         break;
955                 }
956
957                 if (found) continue;
958
959                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
960                 ni++;
961         }
962
963         /*
964          * finally correct the size of the meta_data array
965          */
966         nmd.ctr.ctr1.count = ni;
967
968         /*
969          * the rdn attribute (the alias for the name attribute),
970          * 'cn' for most objects is the last entry in the meta data array
971          * we have stored
972          *
973          * sort the new meta data array
974          */
975         {
976                 struct replPropertyMetaData1 *rdn_p;
977                 uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
978
979                 rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
980                 replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_p->attid);
981         }
982
983         /* create the meta data value */
984         nt_status = ndr_push_struct_blob(&nmd_value, msg, &nmd,
985                                          (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
986         if (!NT_STATUS_IS_OK(nt_status)) {
987                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
988         }
989
990         /*
991          * check if some replicated attributes left, otherwise skip the ldb_modify() call
992          */
993         if (msg->num_elements == 0) {
994                 ldb_debug(ar->module->ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
995                           ar->index_current);
996                 goto next_object;
997         }
998
999         ldb_debug(ar->module->ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
1000                   ar->index_current, msg->num_elements);
1001
1002         /*
1003          * when we now that we'll modify the record, add the whenChanged, uSNChanged
1004          * and replPopertyMetaData attributes
1005          */
1006         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1007         if (ret != LDB_SUCCESS) {
1008                 return replmd_replicated_request_error(ar, ret);
1009         }
1010         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
1011         if (ret != LDB_SUCCESS) {
1012                 return replmd_replicated_request_error(ar, ret);
1013         }
1014         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
1015         if (ret != LDB_SUCCESS) {
1016                 return replmd_replicated_request_error(ar, ret);
1017         }
1018
1019         replmd_ldb_message_sort(msg, ar->schema);
1020
1021         /* we want to replace the old values */
1022         for (i=0; i < msg->num_elements; i++) {
1023                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
1024         }
1025
1026         ret = ldb_build_mod_req(&ar->sub.change_req,
1027                                 ar->module->ldb,
1028                                 ar->sub.mem_ctx,
1029                                 msg,
1030                                 NULL,
1031                                 ar,
1032                                 replmd_replicated_apply_merge_callback);
1033         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1034
1035 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
1036         return ldb_next_request(ar->module, ar->sub.change_req);
1037 #else
1038         ret = ldb_next_request(ar->module, ar->sub.change_req);
1039         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1040
1041         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1042         if (ar->sub.change_ret != LDB_SUCCESS) {
1043                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
1044         }
1045
1046 next_object:
1047         talloc_free(ar->sub.mem_ctx);
1048         ZERO_STRUCT(ar->sub);
1049
1050         ar->index_current++;
1051
1052         return LDB_SUCCESS;
1053 #endif
1054 }
1055
1056 static int replmd_replicated_apply_search_callback(struct ldb_context *ldb,
1057                                                    void *private_data,
1058                                                    struct ldb_reply *ares)
1059 {
1060         struct replmd_replicated_request *ar = talloc_get_type(private_data,
1061                                                struct replmd_replicated_request);
1062         bool is_done = false;
1063
1064         switch (ares->type) {
1065         case LDB_REPLY_ENTRY:
1066                 ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
1067                 break;
1068         case LDB_REPLY_REFERRAL:
1069                 /* we ignore referrals */
1070                 break;
1071         case LDB_REPLY_EXTENDED:
1072         case LDB_REPLY_DONE:
1073                 is_done = true;
1074         }
1075
1076         talloc_free(ares);
1077
1078 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
1079         if (is_done) {
1080                 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1081                 if (ar->sub.search_ret != LDB_SUCCESS) {
1082                         return replmd_replicated_request_error(ar, ar->sub.search_ret);
1083                 }
1084                 if (ar->sub.search_msg) {
1085                         return replmd_replicated_apply_merge(ar);
1086                 }
1087                 return replmd_replicated_apply_add(ar);
1088         }
1089 #endif
1090         return LDB_SUCCESS;
1091 }
1092
1093 static int replmd_replicated_apply_search(struct replmd_replicated_request *ar)
1094 {
1095         int ret;
1096         char *tmp_str;
1097         char *filter;
1098
1099         tmp_str = ldb_binary_encode(ar->sub.mem_ctx, ar->objs->objects[ar->index_current].guid_value);
1100         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1101
1102         filter = talloc_asprintf(ar->sub.mem_ctx, "(objectGUID=%s)", tmp_str);
1103         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1104         talloc_free(tmp_str);
1105
1106         ret = ldb_build_search_req(&ar->sub.search_req,
1107                                    ar->module->ldb,
1108                                    ar->sub.mem_ctx,
1109                                    ar->objs->partition_dn,
1110                                    LDB_SCOPE_SUBTREE,
1111                                    filter,
1112                                    NULL,
1113                                    NULL,
1114                                    ar,
1115                                    replmd_replicated_apply_search_callback);
1116         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1117
1118 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
1119         return ldb_next_request(ar->module, ar->sub.search_req);
1120 #else
1121         ret = ldb_next_request(ar->module, ar->sub.search_req);
1122         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1123
1124         ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1125         if (ar->sub.search_ret != LDB_SUCCESS) {
1126                 return replmd_replicated_request_error(ar, ar->sub.search_ret);
1127         }
1128         if (ar->sub.search_msg) {
1129                 return replmd_replicated_apply_merge(ar);
1130         }
1131
1132         return replmd_replicated_apply_add(ar);
1133 #endif
1134 }
1135
1136 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
1137 {
1138 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
1139         if (ar->index_current >= ar->objs->num_objects) {
1140                 return replmd_replicated_uptodate_vector(ar);
1141         }
1142 #endif
1143
1144         ar->sub.mem_ctx = talloc_new(ar);
1145         if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1146
1147         return replmd_replicated_apply_search(ar);
1148 }
1149
1150 static int replmd_replicated_uptodate_modify_callback(struct ldb_context *ldb,
1151                                                       void *private_data,
1152                                                       struct ldb_reply *ares)
1153 {
1154 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
1155         struct replmd_replicated_request *ar = talloc_get_type(private_data,
1156                                                struct replmd_replicated_request);
1157
1158         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1159         if (ar->sub.change_ret != LDB_SUCCESS) {
1160                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
1161         }
1162
1163         talloc_free(ar->sub.mem_ctx);
1164         ZERO_STRUCT(ar->sub);
1165
1166         return replmd_replicated_request_done(ar);
1167 #else
1168         return LDB_SUCCESS;
1169 #endif
1170 }
1171
1172 static int replmd_drsuapi_DsReplicaCursor2_compare(const struct drsuapi_DsReplicaCursor2 *c1,
1173                                                    const struct drsuapi_DsReplicaCursor2 *c2)
1174 {
1175         return GUID_compare(&c1->source_dsa_invocation_id, &c2->source_dsa_invocation_id);
1176 }
1177
1178 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1179 {
1180         NTSTATUS nt_status;
1181         struct ldb_message *msg;
1182         struct replUpToDateVectorBlob ouv;
1183         const struct ldb_val *ouv_value;
1184         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1185         struct replUpToDateVectorBlob nuv;
1186         struct ldb_val nuv_value;
1187         struct ldb_message_element *nuv_el = NULL;
1188         const struct GUID *our_invocation_id;
1189         struct ldb_message_element *orf_el = NULL;
1190         struct repsFromToBlob nrf;
1191         struct ldb_val *nrf_value = NULL;
1192         struct ldb_message_element *nrf_el = NULL;
1193         uint32_t i,j,ni=0;
1194         uint64_t seq_num;
1195         bool found = false;
1196         time_t t = time(NULL);
1197         NTTIME now;
1198         int ret;
1199
1200         ruv = ar->objs->uptodateness_vector;
1201         ZERO_STRUCT(ouv);
1202         ouv.version = 2;
1203         ZERO_STRUCT(nuv);
1204         nuv.version = 2;
1205
1206         unix_to_nt_time(&now, t);
1207
1208         /* 
1209          * we use the next sequence number for our own highest_usn
1210          * because we will do a modify request and this will increment
1211          * our highest_usn
1212          */
1213         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
1214         if (ret != LDB_SUCCESS) {
1215                 return replmd_replicated_request_error(ar, ret);
1216         }
1217
1218         /*
1219          * first create the new replUpToDateVector
1220          */
1221         ouv_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replUpToDateVector");
1222         if (ouv_value) {
1223                 nt_status = ndr_pull_struct_blob(ouv_value, ar->sub.mem_ctx, &ouv,
1224                                                  (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1225                 if (!NT_STATUS_IS_OK(nt_status)) {
1226                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1227                 }
1228
1229                 if (ouv.version != 2) {
1230                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1231                 }
1232         }
1233
1234         /*
1235          * the new uptodateness vector will at least
1236          * contain 2 entries, one for the source_dsa and one the local server
1237          *
1238          * plus optional values from our old vector and the one from the source_dsa
1239          */
1240         nuv.ctr.ctr2.count = 2 + ouv.ctr.ctr2.count;
1241         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1242         nuv.ctr.ctr2.cursors = talloc_array(ar->sub.mem_ctx,
1243                                             struct drsuapi_DsReplicaCursor2,
1244                                             nuv.ctr.ctr2.count);
1245         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1246
1247         /* first copy the old vector */
1248         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1249                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1250                 ni++;
1251         }
1252
1253         /* merge in the source_dsa vector is available */
1254         for (i=0; (ruv && i < ruv->count); i++) {
1255                 found = false;
1256
1257                 for (j=0; j < ni; j++) {
1258                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1259                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1260                                 continue;
1261                         }
1262
1263                         found = true;
1264
1265                         /*
1266                          * we update only the highest_usn and not the latest_sync_success time,
1267                          * because the last success stands for direct replication
1268                          */
1269                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1270                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1271                         }
1272                         break;                  
1273                 }
1274
1275                 if (found) continue;
1276
1277                 /* if it's not there yet, add it */
1278                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1279                 ni++;
1280         }
1281
1282         /*
1283          * merge in the current highwatermark for the source_dsa
1284          */
1285         found = false;
1286         for (j=0; j < ni; j++) {
1287                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1288                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1289                         continue;
1290                 }
1291
1292                 found = true;
1293
1294                 /*
1295                  * here we update the highest_usn and last_sync_success time
1296                  * because we're directly replicating from the source_dsa
1297                  *
1298                  * and use the tmp_highest_usn because this is what we have just applied
1299                  * to our ldb
1300                  */
1301                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1302                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1303                 break;
1304         }
1305         if (!found) {
1306                 /*
1307                  * here we update the highest_usn and last_sync_success time
1308                  * because we're directly replicating from the source_dsa
1309                  *
1310                  * and use the tmp_highest_usn because this is what we have just applied
1311                  * to our ldb
1312                  */
1313                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1314                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1315                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1316                 ni++;
1317         }
1318
1319         /*
1320          * merge our own current values if we have a invocation_id already
1321          * attached to the ldb
1322          */
1323         our_invocation_id = samdb_ntds_invocation_id(ar->module->ldb);
1324         if (our_invocation_id) {
1325                 found = false;
1326                 for (j=0; j < ni; j++) {
1327                         if (!GUID_equal(our_invocation_id,
1328                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1329                                 continue;
1330                         }
1331
1332                         found = true;
1333
1334                         /*
1335                          * here we update the highest_usn and last_sync_success time
1336                          * because it's our own entry
1337                          */
1338                         nuv.ctr.ctr2.cursors[j].highest_usn             = seq_num;
1339                         nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1340                         break;
1341                 }
1342                 if (!found) {
1343                         /*
1344                          * here we update the highest_usn and last_sync_success time
1345                          * because it's our own entry
1346                          */
1347                         nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= *our_invocation_id;
1348                         nuv.ctr.ctr2.cursors[ni].highest_usn            = seq_num;
1349                         nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1350                         ni++;
1351                 }
1352         }
1353
1354         /*
1355          * finally correct the size of the cursors array
1356          */
1357         nuv.ctr.ctr2.count = ni;
1358
1359         /*
1360          * sort the cursors
1361          */
1362         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1363               sizeof(struct drsuapi_DsReplicaCursor2),
1364               (comparison_fn_t)replmd_drsuapi_DsReplicaCursor2_compare);
1365
1366         /*
1367          * create the change ldb_message
1368          */
1369         msg = ldb_msg_new(ar->sub.mem_ctx);
1370         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1371         msg->dn = ar->sub.search_msg->dn;
1372
1373         nt_status = ndr_push_struct_blob(&nuv_value, msg, &nuv,
1374                                          (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1375         if (!NT_STATUS_IS_OK(nt_status)) {
1376                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1377         }
1378         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1379         if (ret != LDB_SUCCESS) {
1380                 return replmd_replicated_request_error(ar, ret);
1381         }
1382         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1383
1384         /*
1385          * now create the new repsFrom value from the given repsFromTo1 structure
1386          */
1387         ZERO_STRUCT(nrf);
1388         nrf.version                                     = 1;
1389         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1390         /* and fix some values... */
1391         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1392         nrf.ctr.ctr1.last_success                       = now;
1393         nrf.ctr.ctr1.last_attempt                       = now;
1394         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1395         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1396
1397         /*
1398          * first see if we already have a repsFrom value for the current source dsa
1399          * if so we'll later replace this value
1400          */
1401         orf_el = ldb_msg_find_element(ar->sub.search_msg, "repsFrom");
1402         if (orf_el) {
1403                 for (i=0; i < orf_el->num_values; i++) {
1404                         struct repsFromToBlob *trf;
1405
1406                         trf = talloc(ar->sub.mem_ctx, struct repsFromToBlob);
1407                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1408
1409                         nt_status = ndr_pull_struct_blob(&orf_el->values[i], trf, trf,
1410                                                          (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1411                         if (!NT_STATUS_IS_OK(nt_status)) {
1412                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1413                         }
1414
1415                         if (trf->version != 1) {
1416                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1417                         }
1418
1419                         /*
1420                          * we compare the source dsa objectGUID not the invocation_id
1421                          * because we want only one repsFrom value per source dsa
1422                          * and when the invocation_id of the source dsa has changed we don't need 
1423                          * the old repsFrom with the old invocation_id
1424                          */
1425                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1426                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1427                                 talloc_free(trf);
1428                                 continue;
1429                         }
1430
1431                         talloc_free(trf);
1432                         nrf_value = &orf_el->values[i];
1433                         break;
1434                 }
1435
1436                 /*
1437                  * copy over all old values to the new ldb_message
1438                  */
1439                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1440                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1441                 *nrf_el = *orf_el;
1442         }
1443
1444         /*
1445          * if we haven't found an old repsFrom value for the current source dsa
1446          * we'll add a new value
1447          */
1448         if (!nrf_value) {
1449                 struct ldb_val zero_value;
1450                 ZERO_STRUCT(zero_value);
1451                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1452                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1453
1454                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1455         }
1456
1457         /* we now fill the value which is already attached to ldb_message */
1458         nt_status = ndr_push_struct_blob(nrf_value, msg, &nrf,
1459                                          (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1460         if (!NT_STATUS_IS_OK(nt_status)) {
1461                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1462         }
1463
1464         /* 
1465          * the ldb_message_element for the attribute, has all the old values and the new one
1466          * so we'll replace the whole attribute with all values
1467          */
1468         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1469
1470         /* prepare the ldb_modify() request */
1471         ret = ldb_build_mod_req(&ar->sub.change_req,
1472                                 ar->module->ldb,
1473                                 ar->sub.mem_ctx,
1474                                 msg,
1475                                 NULL,
1476                                 ar,
1477                                 replmd_replicated_uptodate_modify_callback);
1478         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1479
1480 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
1481         return ldb_next_request(ar->module, ar->sub.change_req);
1482 #else
1483         ret = ldb_next_request(ar->module, ar->sub.change_req);
1484         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1485
1486         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1487         if (ar->sub.change_ret != LDB_SUCCESS) {
1488                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
1489         }
1490
1491         talloc_free(ar->sub.mem_ctx);
1492         ZERO_STRUCT(ar->sub);
1493
1494         return replmd_replicated_request_done(ar);
1495 #endif
1496 }
1497
1498 static int replmd_replicated_uptodate_search_callback(struct ldb_context *ldb,
1499                                                       void *private_data,
1500                                                       struct ldb_reply *ares)
1501 {
1502         struct replmd_replicated_request *ar = talloc_get_type(private_data,
1503                                                struct replmd_replicated_request);
1504         bool is_done = false;
1505
1506         switch (ares->type) {
1507         case LDB_REPLY_ENTRY:
1508                 ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
1509                 break;
1510         case LDB_REPLY_REFERRAL:
1511                 /* we ignore referrals */
1512                 break;
1513         case LDB_REPLY_EXTENDED:
1514         case LDB_REPLY_DONE:
1515                 is_done = true;
1516         }
1517
1518         talloc_free(ares);
1519
1520 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
1521         if (is_done) {
1522                 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1523                 if (ar->sub.search_ret != LDB_SUCCESS) {
1524                         return replmd_replicated_request_error(ar, ar->sub.search_ret);
1525                 }
1526                 if (!ar->sub.search_msg) {
1527                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1528                 }
1529
1530                 return replmd_replicated_uptodate_modify(ar);
1531         }
1532 #endif
1533         return LDB_SUCCESS;
1534 }
1535
1536 static int replmd_replicated_uptodate_search(struct replmd_replicated_request *ar)
1537 {
1538         int ret;
1539         static const char *attrs[] = {
1540                 "replUpToDateVector",
1541                 "repsFrom",
1542                 NULL
1543         };
1544
1545         ret = ldb_build_search_req(&ar->sub.search_req,
1546                                    ar->module->ldb,
1547                                    ar->sub.mem_ctx,
1548                                    ar->objs->partition_dn,
1549                                    LDB_SCOPE_BASE,
1550                                    "(objectClass=*)",
1551                                    attrs,
1552                                    NULL,
1553                                    ar,
1554                                    replmd_replicated_uptodate_search_callback);
1555         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1556
1557 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
1558         return ldb_next_request(ar->module, ar->sub.search_req);
1559 #else
1560         ret = ldb_next_request(ar->module, ar->sub.search_req);
1561         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1562
1563         ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1564         if (ar->sub.search_ret != LDB_SUCCESS) {
1565                 return replmd_replicated_request_error(ar, ar->sub.search_ret);
1566         }
1567         if (!ar->sub.search_msg) {
1568                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1569         }
1570
1571         return replmd_replicated_uptodate_modify(ar);
1572 #endif
1573 }
1574
1575 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1576 {
1577         ar->sub.mem_ctx = talloc_new(ar);
1578         if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1579
1580         return replmd_replicated_uptodate_search(ar);
1581 }
1582
1583 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1584 {
1585         struct dsdb_extended_replicated_objects *objs;
1586         struct replmd_replicated_request *ar;
1587
1588         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1589
1590         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1591         if (!objs) {
1592                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1593                 return LDB_ERR_PROTOCOL_ERROR;
1594         }
1595
1596         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1597                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1598                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1599                 return LDB_ERR_PROTOCOL_ERROR;
1600         }
1601
1602         ar = replmd_replicated_init_handle(module, req, objs);
1603         if (!ar) {
1604                 return LDB_ERR_OPERATIONS_ERROR;
1605         }
1606
1607 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
1608         return replmd_replicated_apply_next(ar);
1609 #else
1610         while (ar->index_current < ar->objs->num_objects &&
1611                req->handle->state != LDB_ASYNC_DONE) { 
1612                 replmd_replicated_apply_next(ar);
1613         }
1614
1615         if (req->handle->state != LDB_ASYNC_DONE) {
1616                 replmd_replicated_uptodate_vector(ar);
1617         }
1618
1619         return LDB_SUCCESS;
1620 #endif
1621 }
1622
1623 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
1624 {
1625         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
1626                 return replmd_extended_replicated_objects(module, req);
1627         }
1628
1629         return ldb_next_request(module, req);
1630 }
1631
1632 static int replmd_wait_none(struct ldb_handle *handle) {
1633         struct replmd_replicated_request *ar;
1634     
1635         if (!handle || !handle->private_data) {
1636                 return LDB_ERR_OPERATIONS_ERROR;
1637         }
1638
1639         ar = talloc_get_type(handle->private_data, struct replmd_replicated_request);
1640         if (!ar) {
1641                 return LDB_ERR_OPERATIONS_ERROR;
1642         }
1643
1644         /* we do only sync calls */
1645         if (handle->state != LDB_ASYNC_DONE) {
1646                 return LDB_ERR_OPERATIONS_ERROR;
1647         }
1648
1649         return handle->status;
1650 }
1651
1652 static int replmd_wait_all(struct ldb_handle *handle) {
1653
1654         int ret;
1655
1656         while (handle->state != LDB_ASYNC_DONE) {
1657                 ret = replmd_wait_none(handle);
1658                 if (ret != LDB_SUCCESS) {
1659                         return ret;
1660                 }
1661         }
1662
1663         return handle->status;
1664 }
1665
1666 static int replmd_wait(struct ldb_handle *handle, enum ldb_wait_type type)
1667 {
1668         if (type == LDB_WAIT_ALL) {
1669                 return replmd_wait_all(handle);
1670         } else {
1671                 return replmd_wait_none(handle);
1672         }
1673 }
1674
1675 static const struct ldb_module_ops replmd_ops = {
1676         .name          = "repl_meta_data",
1677         .add           = replmd_add,
1678         .modify        = replmd_modify,
1679         .extended      = replmd_extended,
1680         .wait          = replmd_wait
1681 };
1682
1683 int repl_meta_data_module_init(void)
1684 {
1685         return ldb_register_module(&replmd_ops);
1686 }