r26639: librpc: Pass iconv convenience on from RPC connection to NDR library, so...
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2006
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12    
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb
29  *
30  *  Component: ldb repl_meta_data module
31  *
32  *  Description: - add a unique objectGUID onto every new record,
33  *               - handle whenCreated, whenChanged timestamps
34  *               - handle uSNCreated, uSNChanged numbers
35  *               - handle replPropertyMetaData attribute
36  *
37  *  Author: Simo Sorce
38  *  Author: Stefan Metzmacher
39  */
40
41 #include "includes.h"
42 #include "lib/ldb/include/ldb.h"
43 #include "lib/ldb/include/ldb_errors.h"
44 #include "lib/ldb/include/ldb_private.h"
45 #include "dsdb/samdb/samdb.h"
46 #include "dsdb/common/flags.h"
47 #include "librpc/gen_ndr/ndr_misc.h"
48 #include "librpc/gen_ndr/ndr_drsuapi.h"
49 #include "librpc/gen_ndr/ndr_drsblobs.h"
50 #include "param/param.h"
51
52 struct replmd_replicated_request {
53         struct ldb_module *module;
54         struct ldb_handle *handle;
55         struct ldb_request *orig_req;
56
57         const struct dsdb_schema *schema;
58
59         struct dsdb_extended_replicated_objects *objs;
60
61         uint32_t index_current;
62
63         struct {
64                 TALLOC_CTX *mem_ctx;
65                 struct ldb_request *search_req;
66                 struct ldb_message *search_msg;
67                 int search_ret;
68                 struct ldb_request *change_req;
69                 int change_ret;
70         } sub;
71 };
72
73 static struct replmd_replicated_request *replmd_replicated_init_handle(struct ldb_module *module,
74                                                                        struct ldb_request *req,
75                                                                        struct dsdb_extended_replicated_objects *objs)
76 {
77         struct replmd_replicated_request *ar;
78         struct ldb_handle *h;
79         const struct dsdb_schema *schema;
80
81         schema = dsdb_get_schema(module->ldb);
82         if (!schema) {
83                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
84                               "replmd_replicated_init_handle: no loaded schema found\n");
85                 return NULL;
86         }
87
88         h = talloc_zero(req, struct ldb_handle);
89         if (h == NULL) {
90                 ldb_set_errstring(module->ldb, "Out of Memory");
91                 return NULL;
92         }
93
94         h->module       = module;
95         h->state        = LDB_ASYNC_PENDING;
96         h->status       = LDB_SUCCESS;
97
98         ar = talloc_zero(h, struct replmd_replicated_request);
99         if (ar == NULL) {
100                 ldb_set_errstring(module->ldb, "Out of Memory");
101                 talloc_free(h);
102                 return NULL;
103         }
104
105         h->private_data = ar;
106
107         ar->module      = module;
108         ar->handle      = h;
109         ar->orig_req    = req;
110         ar->schema      = schema;
111         ar->objs        = objs;
112
113         req->handle = h;
114
115         return ar;
116 }
117
118 /*
119   add a time element to a record
120 */
121 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
122 {
123         struct ldb_message_element *el;
124         char *s;
125
126         if (ldb_msg_find_element(msg, attr) != NULL) {
127                 return 0;
128         }
129
130         s = ldb_timestring(msg, t);
131         if (s == NULL) {
132                 return -1;
133         }
134
135         if (ldb_msg_add_string(msg, attr, s) != 0) {
136                 return -1;
137         }
138
139         el = ldb_msg_find_element(msg, attr);
140         /* always set as replace. This works because on add ops, the flag
141            is ignored */
142         el->flags = LDB_FLAG_MOD_REPLACE;
143
144         return 0;
145 }
146
147 /*
148   add a uint64_t element to a record
149 */
150 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
151 {
152         struct ldb_message_element *el;
153
154         if (ldb_msg_find_element(msg, attr) != NULL) {
155                 return 0;
156         }
157
158         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != 0) {
159                 return -1;
160         }
161
162         el = ldb_msg_find_element(msg, attr);
163         /* always set as replace. This works because on add ops, the flag
164            is ignored */
165         el->flags = LDB_FLAG_MOD_REPLACE;
166
167         return 0;
168 }
169
170 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
171                                                    const struct replPropertyMetaData1 *m2,
172                                                    const uint32_t *rdn_attid)
173 {
174         if (m1->attid == m2->attid) {
175                 return 0;
176         }
177
178         /*
179          * the rdn attribute should be at the end!
180          * so we need to return a value greater than zero
181          * which means m1 is greater than m2
182          */
183         if (m1->attid == *rdn_attid) {
184                 return 1;
185         }
186
187         /*
188          * the rdn attribute should be at the end!
189          * so we need to return a value less than zero
190          * which means m2 is greater than m1
191          */
192         if (m2->attid == *rdn_attid) {
193                 return -1;
194         }
195
196         return m1->attid - m2->attid;
197 }
198
199 static void replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
200                                                  const uint32_t *rdn_attid)
201 {
202         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
203                   discard_const_p(void, rdn_attid), (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
204 }
205
206 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
207                                                  const struct ldb_message_element *e2,
208                                                  const struct dsdb_schema *schema)
209 {
210         const struct dsdb_attribute *a1;
211         const struct dsdb_attribute *a2;
212
213         /* 
214          * TODO: make this faster by caching the dsdb_attribute pointer
215          *       on the ldb_messag_element
216          */
217
218         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
219         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
220
221         /*
222          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
223          *       in the schema
224          */
225         if (!a1 || !a2) {
226                 return strcasecmp(e1->name, e2->name);
227         }
228
229         return a1->attributeID_id - a2->attributeID_id;
230 }
231
232 static void replmd_ldb_message_sort(struct ldb_message *msg,
233                                     const struct dsdb_schema *schema)
234 {
235         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
236                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
237 }
238
239 static int replmd_prepare_originating(struct ldb_module *module, struct ldb_request *req,
240                                       struct ldb_dn *dn, const char *fn_name,
241                                       int (*fn)(struct ldb_module *,
242                                                 struct ldb_request *,
243                                                 const struct dsdb_schema *,
244                                                 const struct dsdb_control_current_partition *))
245 {
246         const struct dsdb_schema *schema;
247         const struct ldb_control *partition_ctrl;
248         const struct dsdb_control_current_partition *partition;
249  
250         /* do not manipulate our control entries */
251         if (ldb_dn_is_special(dn)) {
252                 return ldb_next_request(module, req);
253         }
254
255         schema = dsdb_get_schema(module->ldb);
256         if (!schema) {
257                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
258                               "%s: no dsdb_schema loaded",
259                               fn_name);
260                 return LDB_ERR_CONSTRAINT_VIOLATION;
261         }
262
263         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
264         if (!partition_ctrl) {
265                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
266                               "%s: no current partition control found",
267                               fn_name);
268                 return LDB_ERR_CONSTRAINT_VIOLATION;
269         }
270
271         partition = talloc_get_type(partition_ctrl->data,
272                                     struct dsdb_control_current_partition);
273         if (!partition) {
274                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
275                               "%s: current partition control contains invalid data",
276                               fn_name);
277                 return LDB_ERR_CONSTRAINT_VIOLATION;
278         }
279
280         if (partition->version != DSDB_CONTROL_CURRENT_PARTITION_VERSION) {
281                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
282                               "%s: current partition control contains invalid version [%u != %u]\n",
283                               fn_name, partition->version, DSDB_CONTROL_CURRENT_PARTITION_VERSION);
284                 return LDB_ERR_CONSTRAINT_VIOLATION;
285         }
286
287         return fn(module, req, schema, partition);
288 }
289
290 static int replmd_add_originating(struct ldb_module *module,
291                                   struct ldb_request *req,
292                                   const struct dsdb_schema *schema,
293                                   const struct dsdb_control_current_partition *partition)
294 {
295         enum ndr_err_code ndr_err;
296         struct ldb_request *down_req;
297         struct ldb_message *msg;
298         uint32_t instance_type;
299         struct ldb_dn *new_dn;
300         const char *rdn_name;
301         const char *rdn_name_upper;
302         const struct ldb_val *rdn_value = NULL;
303         const struct dsdb_attribute *rdn_attr = NULL;
304         struct GUID guid;
305         struct ldb_val guid_value;
306         struct replPropertyMetaDataBlob nmd;
307         struct ldb_val nmd_value;
308         uint64_t seq_num;
309         const struct GUID *our_invocation_id;
310         time_t t = time(NULL);
311         NTTIME now;
312         char *time_str;
313         int ret;
314         uint32_t i, ni=0;
315
316         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_add_originating\n");
317
318         if (ldb_msg_find_element(req->op.add.message, "objectGUID")) {
319                 ldb_debug_set(module->ldb, LDB_DEBUG_ERROR,
320                               "replmd_add_originating: it's not allowed to add an object with objectGUID\n");
321                 return LDB_ERR_UNWILLING_TO_PERFORM;
322         }
323
324         if (ldb_msg_find_element(req->op.add.message, "instanceType")) {
325                 ldb_debug_set(module->ldb, LDB_DEBUG_ERROR,
326                               "replmd_add_originating: it's not allowed to add an object with instanceType\n");
327                 return LDB_ERR_UNWILLING_TO_PERFORM;
328         }
329
330         /* Get a sequence number from the backend */
331         ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
332         if (ret != LDB_SUCCESS) {
333                 return ret;
334         }
335
336         /* a new GUID */
337         guid = GUID_random();
338
339         /* get our invicationId */
340         our_invocation_id = samdb_ntds_invocation_id(module->ldb);
341         if (!our_invocation_id) {
342                 ldb_debug_set(module->ldb, LDB_DEBUG_ERROR,
343                               "replmd_add_originating: unable to find invocationId\n");
344                 return LDB_ERR_OPERATIONS_ERROR;
345         }
346
347         /* create a copy of the request */
348         down_req = talloc(req, struct ldb_request);
349         if (down_req == NULL) {
350                 ldb_oom(module->ldb);
351                 return LDB_ERR_OPERATIONS_ERROR;
352         }
353         *down_req = *req;
354
355         /* we have to copy the message as the caller might have it as a const */
356         down_req->op.add.message = msg = ldb_msg_copy_shallow(down_req, req->op.add.message);
357         if (msg == NULL) {
358                 talloc_free(down_req);
359                 ldb_oom(module->ldb);
360                 return LDB_ERR_OPERATIONS_ERROR;
361         }
362
363         /* generated times */
364         unix_to_nt_time(&now, t);
365         time_str = ldb_timestring(msg, t);
366         if (!time_str) {
367                 talloc_free(down_req);
368                 return LDB_ERR_OPERATIONS_ERROR;
369         }
370
371         /*
372          * get details of the rdn name
373          */
374         rdn_name        = ldb_dn_get_rdn_name(msg->dn);
375         if (!rdn_name) {
376                 talloc_free(down_req);
377                 ldb_oom(module->ldb);
378                 return LDB_ERR_OPERATIONS_ERROR;
379         }
380         rdn_attr        = dsdb_attribute_by_lDAPDisplayName(schema, rdn_name);
381         if (!rdn_attr) {
382                 talloc_free(down_req);
383                 return LDB_ERR_OPERATIONS_ERROR;
384         }
385         rdn_value       = ldb_dn_get_rdn_val(msg->dn);
386         if (!rdn_value) {
387                 talloc_free(down_req);
388                 ldb_oom(module->ldb);
389                 return LDB_ERR_OPERATIONS_ERROR;
390         }
391
392         /* 
393          * remove autogenerated attributes
394          */
395         ldb_msg_remove_attr(msg, rdn_name);
396         ldb_msg_remove_attr(msg, "name");
397         ldb_msg_remove_attr(msg, "whenCreated");
398         ldb_msg_remove_attr(msg, "whenChanged");
399         ldb_msg_remove_attr(msg, "uSNCreated");
400         ldb_msg_remove_attr(msg, "uSNChanged");
401         ldb_msg_remove_attr(msg, "replPropertyMetaData");
402
403         /*
404          * TODO: construct a new DN out of:
405          *       - the parent DN
406          *       - the upper case of rdn_attr->LDAPDisplayName
407          *       - rdn_value
408          */
409         new_dn = ldb_dn_copy(msg, msg->dn);
410         if (!new_dn) {
411                 talloc_free(down_req);
412                 ldb_oom(module->ldb);
413                 return LDB_ERR_OPERATIONS_ERROR;
414         }
415         rdn_name_upper = strupper_talloc(msg, rdn_attr->lDAPDisplayName);
416         if (!rdn_name_upper) {
417                 talloc_free(down_req);
418                 ldb_oom(module->ldb);
419                 return LDB_ERR_OPERATIONS_ERROR;
420         }
421         ret = ldb_dn_set_component(new_dn, 0, rdn_name_upper, *rdn_value);
422         if (ret != LDB_SUCCESS) {
423                 talloc_free(down_req);
424                 ldb_oom(module->ldb);
425                 return LDB_ERR_OPERATIONS_ERROR;
426         }
427         msg->dn = new_dn;
428
429         /*
430          * TODO: calculate correct instance type
431          */
432         instance_type = INSTANCE_TYPE_WRITE;
433         if (ldb_dn_compare(partition->dn, msg->dn) == 0) {
434                 instance_type |= INSTANCE_TYPE_IS_NC_HEAD;
435                 if (ldb_dn_compare(msg->dn, samdb_base_dn(module->ldb)) != 0) {
436                         instance_type |= INSTANCE_TYPE_NC_ABOVE;
437                 }
438         }
439
440         /*
441          * readd replicated attributes
442          */
443         ret = ldb_msg_add_value(msg, rdn_attr->lDAPDisplayName, rdn_value, NULL);
444         if (ret != LDB_SUCCESS) {
445                 talloc_free(down_req);
446                 ldb_oom(module->ldb);
447                 return LDB_ERR_OPERATIONS_ERROR;
448         }
449         ret = ldb_msg_add_value(msg, "name", rdn_value, NULL);
450         if (ret != LDB_SUCCESS) {
451                 talloc_free(down_req);
452                 ldb_oom(module->ldb);
453                 return LDB_ERR_OPERATIONS_ERROR;
454         }
455         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
456         if (ret != LDB_SUCCESS) {
457                 talloc_free(down_req);
458                 ldb_oom(module->ldb);
459                 return LDB_ERR_OPERATIONS_ERROR;
460         }
461         ret = ldb_msg_add_fmt(msg, "instanceType", "%u", instance_type);
462         if (ret != LDB_SUCCESS) {
463                 talloc_free(down_req);
464                 ldb_oom(module->ldb);
465                 return LDB_ERR_OPERATIONS_ERROR;
466         }
467
468         /* build the replication meta_data */
469         ZERO_STRUCT(nmd);
470         nmd.version             = 1;
471         nmd.ctr.ctr1.count      = msg->num_elements;
472         nmd.ctr.ctr1.array      = talloc_array(msg,
473                                                struct replPropertyMetaData1,
474                                                nmd.ctr.ctr1.count);
475         if (!nmd.ctr.ctr1.array) {
476                 talloc_free(down_req);
477                 ldb_oom(module->ldb);
478                 return LDB_ERR_OPERATIONS_ERROR;
479         }
480
481         for (i=0; i < msg->num_elements; i++) {
482                 struct ldb_message_element *e = &msg->elements[i];
483                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
484                 const struct dsdb_attribute *sa;
485
486                 if (e->name[0] == '@') continue;
487
488                 sa = dsdb_attribute_by_lDAPDisplayName(schema, e->name);
489                 if (!sa) {
490                         ldb_debug_set(module->ldb, LDB_DEBUG_ERROR,
491                                       "replmd_add_originating: attribute '%s' not defined in schema\n",
492                                       e->name);
493                         talloc_free(down_req);
494                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
495                 }
496
497                 if ((sa->systemFlags & 0x00000001) || (sa->systemFlags & 0x00000004)) {
498                         /* if the attribute is not replicated (0x00000001)
499                          * or constructed (0x00000004) it has no metadata
500                          */
501                         continue;
502                 }
503
504                 m->attid                        = sa->attributeID_id;
505                 m->version                      = 1;
506                 m->originating_change_time      = now;
507                 m->originating_invocation_id    = *our_invocation_id;
508                 m->originating_usn              = seq_num;
509                 m->local_usn                    = seq_num;
510                 ni++;
511         }
512
513         /* fix meta data count */
514         nmd.ctr.ctr1.count = ni;
515
516         /*
517          * sort meta data array, and move the rdn attribute entry to the end
518          */
519         replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_attr->attributeID_id);
520
521         /* generated NDR encoded values */
522         ndr_err = ndr_push_struct_blob(&guid_value, msg, 
523                                        NULL,
524                                        &guid,
525                                        (ndr_push_flags_fn_t)ndr_push_GUID);
526         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
527                 ldb_oom(module->ldb);
528                 return LDB_ERR_OPERATIONS_ERROR;
529         }
530         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
531                                        lp_iconv_convenience(ldb_get_opaque(module->ldb, "loadparm")),
532                                        &nmd,
533                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
534         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
535                 talloc_free(down_req);
536                 ldb_oom(module->ldb);
537                 return LDB_ERR_OPERATIONS_ERROR;
538         }
539
540         /*
541          * add the autogenerated values
542          */
543         ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
544         if (ret != LDB_SUCCESS) {
545                 talloc_free(down_req);
546                 ldb_oom(module->ldb);
547                 return LDB_ERR_OPERATIONS_ERROR;
548         }
549         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
550         if (ret != LDB_SUCCESS) {
551                 talloc_free(down_req);
552                 ldb_oom(module->ldb);
553                 return LDB_ERR_OPERATIONS_ERROR;
554         }
555         ret = samdb_msg_add_uint64(module->ldb, msg, msg, "uSNCreated", seq_num);
556         if (ret != LDB_SUCCESS) {
557                 talloc_free(down_req);
558                 ldb_oom(module->ldb);
559                 return LDB_ERR_OPERATIONS_ERROR;
560         }
561         ret = samdb_msg_add_uint64(module->ldb, msg, msg, "uSNChanged", seq_num);
562         if (ret != LDB_SUCCESS) {
563                 talloc_free(down_req);
564                 ldb_oom(module->ldb);
565                 return LDB_ERR_OPERATIONS_ERROR;
566         }
567         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
568         if (ret != LDB_SUCCESS) {
569                 talloc_free(down_req);
570                 ldb_oom(module->ldb);
571                 return LDB_ERR_OPERATIONS_ERROR;
572         }
573
574         /*
575          * sort the attributes by attid before storing the object
576          */
577         replmd_ldb_message_sort(msg, schema);
578
579         ldb_set_timeout_from_prev_req(module->ldb, req, down_req);
580
581         /* go on with the call chain */
582         ret = ldb_next_request(module, down_req);
583
584         /* do not free down_req as the call results may be linked to it,
585          * it will be freed when the upper level request get freed */
586         if (ret == LDB_SUCCESS) {
587                 req->handle = down_req->handle;
588         }
589
590         return ret;
591 }
592
593 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
594 {
595         return replmd_prepare_originating(module, req, req->op.add.message->dn,
596                                           "replmd_add", replmd_add_originating);
597 }
598
599 static int replmd_modify_originating(struct ldb_module *module,
600                                      struct ldb_request *req,
601                                      const struct dsdb_schema *schema,
602                                      const struct dsdb_control_current_partition *partition)
603 {
604         struct ldb_request *down_req;
605         struct ldb_message *msg;
606         int ret;
607         time_t t = time(NULL);
608         uint64_t seq_num;
609
610         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_modify_originating\n");
611
612         down_req = talloc(req, struct ldb_request);
613         if (down_req == NULL) {
614                 return LDB_ERR_OPERATIONS_ERROR;
615         }
616
617         *down_req = *req;
618
619         /* we have to copy the message as the caller might have it as a const */
620         down_req->op.mod.message = msg = ldb_msg_copy_shallow(down_req, req->op.mod.message);
621         if (msg == NULL) {
622                 talloc_free(down_req);
623                 return LDB_ERR_OPERATIONS_ERROR;
624         }
625
626         if (add_time_element(msg, "whenChanged", t) != 0) {
627                 talloc_free(down_req);
628                 return LDB_ERR_OPERATIONS_ERROR;
629         }
630
631         /* Get a sequence number from the backend */
632         ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
633         if (ret == LDB_SUCCESS) {
634                 if (add_uint64_element(msg, "uSNChanged", seq_num) != 0) {
635                         talloc_free(down_req);
636                         return LDB_ERR_OPERATIONS_ERROR;
637                 }
638         }
639
640         ldb_set_timeout_from_prev_req(module->ldb, req, down_req);
641
642         /* go on with the call chain */
643         ret = ldb_next_request(module, down_req);
644
645         /* do not free down_req as the call results may be linked to it,
646          * it will be freed when the upper level request get freed */
647         if (ret == LDB_SUCCESS) {
648                 req->handle = down_req->handle;
649         }
650
651         return ret;
652 }
653
654 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
655 {
656         return replmd_prepare_originating(module, req, req->op.mod.message->dn,
657                                           "replmd_modify", replmd_modify_originating);
658 }
659
660 static int replmd_replicated_request_reply_helper(struct replmd_replicated_request *ar, int ret)
661 {
662         struct ldb_reply *ares = NULL;
663
664         ar->handle->status = ret;
665         ar->handle->state = LDB_ASYNC_DONE;
666
667         if (!ar->orig_req->callback) {
668                 return LDB_SUCCESS;
669         }
670         
671         /* we're done and need to report the success to the caller */
672         ares = talloc_zero(ar, struct ldb_reply);
673         if (!ares) {
674                 ar->handle->status = LDB_ERR_OPERATIONS_ERROR;
675                 ar->handle->state = LDB_ASYNC_DONE;
676                 return LDB_ERR_OPERATIONS_ERROR;
677         }
678
679         ares->type      = LDB_REPLY_EXTENDED;
680         ares->response  = NULL;
681
682         return ar->orig_req->callback(ar->module->ldb, ar->orig_req->context, ares);
683 }
684
685 static int replmd_replicated_request_done(struct replmd_replicated_request *ar)
686 {
687         return replmd_replicated_request_reply_helper(ar, LDB_SUCCESS);
688 }
689
690 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
691 {
692         return replmd_replicated_request_reply_helper(ar, ret);
693 }
694
695 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
696 {
697         int ret = LDB_ERR_OTHER;
698         /* TODO: do some error mapping */
699         return replmd_replicated_request_reply_helper(ar, ret);
700 }
701
702 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
703
704 static int replmd_replicated_apply_add_callback(struct ldb_context *ldb,
705                                                 void *private_data,
706                                                 struct ldb_reply *ares)
707 {
708 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
709         struct replmd_replicated_request *ar = talloc_get_type(private_data,
710                                                struct replmd_replicated_request);
711
712         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
713         if (ar->sub.change_ret != LDB_SUCCESS) {
714                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
715         }
716
717         talloc_free(ar->sub.mem_ctx);
718         ZERO_STRUCT(ar->sub);
719
720         ar->index_current++;
721
722         return replmd_replicated_apply_next(ar);
723 #else
724         return LDB_SUCCESS;
725 #endif
726 }
727
728 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
729 {
730         enum ndr_err_code ndr_err;
731         struct ldb_message *msg;
732         struct replPropertyMetaDataBlob *md;
733         struct ldb_val md_value;
734         uint32_t i;
735         uint64_t seq_num;
736         int ret;
737
738         /*
739          * TODO: check if the parent object exist
740          */
741
742         /*
743          * TODO: handle the conflict case where an object with the
744          *       same name exist
745          */
746
747         msg = ar->objs->objects[ar->index_current].msg;
748         md = ar->objs->objects[ar->index_current].meta_data;
749
750         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
751         if (ret != LDB_SUCCESS) {
752                 return replmd_replicated_request_error(ar, ret);
753         }
754
755         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
756         if (ret != LDB_SUCCESS) {
757                 return replmd_replicated_request_error(ar, ret);
758         }
759
760         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
761         if (ret != LDB_SUCCESS) {
762                 return replmd_replicated_request_error(ar, ret);
763         }
764
765         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNCreated", seq_num);
766         if (ret != LDB_SUCCESS) {
767                 return replmd_replicated_request_error(ar, ret);
768         }
769
770         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
771         if (ret != LDB_SUCCESS) {
772                 return replmd_replicated_request_error(ar, ret);
773         }
774
775         /*
776          * the meta data array is already sorted by the caller
777          */
778         for (i=0; i < md->ctr.ctr1.count; i++) {
779                 md->ctr.ctr1.array[i].local_usn = seq_num;
780         }
781         ndr_err = ndr_push_struct_blob(&md_value, msg, 
782                                        lp_iconv_convenience(global_loadparm),
783                                        md,
784                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
785         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
786                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
787                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
788         }
789         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
790         if (ret != LDB_SUCCESS) {
791                 return replmd_replicated_request_error(ar, ret);
792         }
793
794         replmd_ldb_message_sort(msg, ar->schema);
795
796         ret = ldb_build_add_req(&ar->sub.change_req,
797                                 ar->module->ldb,
798                                 ar->sub.mem_ctx,
799                                 msg,
800                                 NULL,
801                                 ar,
802                                 replmd_replicated_apply_add_callback);
803         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
804
805 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
806         return ldb_next_request(ar->module, ar->sub.change_req);
807 #else
808         ret = ldb_next_request(ar->module, ar->sub.change_req);
809         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
810
811         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
812         if (ar->sub.change_ret != LDB_SUCCESS) {
813                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
814         }
815
816         talloc_free(ar->sub.mem_ctx);
817         ZERO_STRUCT(ar->sub);
818
819         ar->index_current++;
820
821         return LDB_SUCCESS;
822 #endif
823 }
824
825 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
826                                                          struct replPropertyMetaData1 *m2)
827 {
828         int ret;
829
830         if (m1->version != m2->version) {
831                 return m1->version - m2->version;
832         }
833
834         if (m1->originating_change_time != m2->originating_change_time) {
835                 return m1->originating_change_time - m2->originating_change_time;
836         }
837
838         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
839         if (ret != 0) {
840                 return ret;
841         }
842
843         return m1->originating_usn - m2->originating_usn;
844 }
845
846 static int replmd_replicated_apply_merge_callback(struct ldb_context *ldb,
847                                                   void *private_data,
848                                                   struct ldb_reply *ares)
849 {
850 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
851         struct replmd_replicated_request *ar = talloc_get_type(private_data,
852                                                struct replmd_replicated_request);
853
854         ret = ldb_next_request(ar->module, ar->sub.change_req);
855         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
856
857         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
858         if (ar->sub.change_ret != LDB_SUCCESS) {
859                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
860         }
861
862         talloc_free(ar->sub.mem_ctx);
863         ZERO_STRUCT(ar->sub);
864
865         ar->index_current++;
866
867         return LDB_SUCCESS;
868 #else
869         return LDB_SUCCESS;
870 #endif
871 }
872
873 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
874 {
875         enum ndr_err_code ndr_err;
876         struct ldb_message *msg;
877         struct replPropertyMetaDataBlob *rmd;
878         struct replPropertyMetaDataBlob omd;
879         const struct ldb_val *omd_value;
880         struct replPropertyMetaDataBlob nmd;
881         struct ldb_val nmd_value;
882         uint32_t i,j,ni=0;
883         uint32_t removed_attrs = 0;
884         uint64_t seq_num;
885         int ret;
886
887         msg = ar->objs->objects[ar->index_current].msg;
888         rmd = ar->objs->objects[ar->index_current].meta_data;
889         ZERO_STRUCT(omd);
890         omd.version = 1;
891
892         /*
893          * TODO: add rename conflict handling
894          */
895         if (ldb_dn_compare(msg->dn, ar->sub.search_msg->dn) != 0) {
896                 ldb_debug_set(ar->module->ldb, LDB_DEBUG_FATAL, "replmd_replicated_apply_merge[%u]: rename not supported",
897                               ar->index_current);
898                 ldb_debug(ar->module->ldb, LDB_DEBUG_FATAL, "%s => %s\n",
899                           ldb_dn_get_linearized(ar->sub.search_msg->dn),
900                           ldb_dn_get_linearized(msg->dn));
901                 return replmd_replicated_request_werror(ar, WERR_NOT_SUPPORTED);
902         }
903
904         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
905         if (ret != LDB_SUCCESS) {
906                 return replmd_replicated_request_error(ar, ret);
907         }
908
909         /* find existing meta data */
910         omd_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replPropertyMetaData");
911         if (omd_value) {
912                 ndr_err = ndr_pull_struct_blob(omd_value, ar->sub.mem_ctx, 
913                                                lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")), &omd,
914                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
915                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
916                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
917                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
918                 }
919
920                 if (omd.version != 1) {
921                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
922                 }
923         }
924
925         ZERO_STRUCT(nmd);
926         nmd.version = 1;
927         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
928         nmd.ctr.ctr1.array = talloc_array(ar->sub.mem_ctx,
929                                           struct replPropertyMetaData1,
930                                           nmd.ctr.ctr1.count);
931         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
932
933         /* first copy the old meta data */
934         for (i=0; i < omd.ctr.ctr1.count; i++) {
935                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
936                 ni++;
937         }
938
939         /* now merge in the new meta data */
940         for (i=0; i < rmd->ctr.ctr1.count; i++) {
941                 bool found = false;
942
943                 rmd->ctr.ctr1.array[i].local_usn = seq_num;
944
945                 for (j=0; j < ni; j++) {
946                         int cmp;
947
948                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
949                                 continue;
950                         }
951
952                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
953                                                                             &nmd.ctr.ctr1.array[j]);
954                         if (cmp > 0) {
955                                 /* replace the entry */
956                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
957                                 found = true;
958                                 break;
959                         }
960
961                         /* we don't want to apply this change so remove the attribute */
962                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
963                         removed_attrs++;
964
965                         found = true;
966                         break;
967                 }
968
969                 if (found) continue;
970
971                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
972                 ni++;
973         }
974
975         /*
976          * finally correct the size of the meta_data array
977          */
978         nmd.ctr.ctr1.count = ni;
979
980         /*
981          * the rdn attribute (the alias for the name attribute),
982          * 'cn' for most objects is the last entry in the meta data array
983          * we have stored
984          *
985          * sort the new meta data array
986          */
987         {
988                 struct replPropertyMetaData1 *rdn_p;
989                 uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
990
991                 rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
992                 replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_p->attid);
993         }
994
995         /* create the meta data value */
996         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
997                                        lp_iconv_convenience(global_loadparm),
998                                        &nmd,
999                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1000         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1001                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1002                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1003         }
1004
1005         /*
1006          * check if some replicated attributes left, otherwise skip the ldb_modify() call
1007          */
1008         if (msg->num_elements == 0) {
1009                 ldb_debug(ar->module->ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
1010                           ar->index_current);
1011                 goto next_object;
1012         }
1013
1014         ldb_debug(ar->module->ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
1015                   ar->index_current, msg->num_elements);
1016
1017         /*
1018          * when we now that we'll modify the record, add the whenChanged, uSNChanged
1019          * and replPopertyMetaData attributes
1020          */
1021         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1022         if (ret != LDB_SUCCESS) {
1023                 return replmd_replicated_request_error(ar, ret);
1024         }
1025         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
1026         if (ret != LDB_SUCCESS) {
1027                 return replmd_replicated_request_error(ar, ret);
1028         }
1029         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
1030         if (ret != LDB_SUCCESS) {
1031                 return replmd_replicated_request_error(ar, ret);
1032         }
1033
1034         replmd_ldb_message_sort(msg, ar->schema);
1035
1036         /* we want to replace the old values */
1037         for (i=0; i < msg->num_elements; i++) {
1038                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
1039         }
1040
1041         ret = ldb_build_mod_req(&ar->sub.change_req,
1042                                 ar->module->ldb,
1043                                 ar->sub.mem_ctx,
1044                                 msg,
1045                                 NULL,
1046                                 ar,
1047                                 replmd_replicated_apply_merge_callback);
1048         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1049
1050 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1051         return ldb_next_request(ar->module, ar->sub.change_req);
1052 #else
1053         ret = ldb_next_request(ar->module, ar->sub.change_req);
1054         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1055
1056         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1057         if (ar->sub.change_ret != LDB_SUCCESS) {
1058                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
1059         }
1060
1061 next_object:
1062         talloc_free(ar->sub.mem_ctx);
1063         ZERO_STRUCT(ar->sub);
1064
1065         ar->index_current++;
1066
1067         return LDB_SUCCESS;
1068 #endif
1069 }
1070
1071 static int replmd_replicated_apply_search_callback(struct ldb_context *ldb,
1072                                                    void *private_data,
1073                                                    struct ldb_reply *ares)
1074 {
1075         struct replmd_replicated_request *ar = talloc_get_type(private_data,
1076                                                struct replmd_replicated_request);
1077         bool is_done = false;
1078
1079         switch (ares->type) {
1080         case LDB_REPLY_ENTRY:
1081                 ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
1082                 break;
1083         case LDB_REPLY_REFERRAL:
1084                 /* we ignore referrals */
1085                 break;
1086         case LDB_REPLY_EXTENDED:
1087         case LDB_REPLY_DONE:
1088                 is_done = true;
1089         }
1090
1091         talloc_free(ares);
1092
1093 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1094         if (is_done) {
1095                 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1096                 if (ar->sub.search_ret != LDB_SUCCESS) {
1097                         return replmd_replicated_request_error(ar, ar->sub.search_ret);
1098                 }
1099                 if (ar->sub.search_msg) {
1100                         return replmd_replicated_apply_merge(ar);
1101                 }
1102                 return replmd_replicated_apply_add(ar);
1103         }
1104 #endif
1105         return LDB_SUCCESS;
1106 }
1107
1108 static int replmd_replicated_apply_search(struct replmd_replicated_request *ar)
1109 {
1110         int ret;
1111         char *tmp_str;
1112         char *filter;
1113
1114         tmp_str = ldb_binary_encode(ar->sub.mem_ctx, ar->objs->objects[ar->index_current].guid_value);
1115         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1116
1117         filter = talloc_asprintf(ar->sub.mem_ctx, "(objectGUID=%s)", tmp_str);
1118         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1119         talloc_free(tmp_str);
1120
1121         ret = ldb_build_search_req(&ar->sub.search_req,
1122                                    ar->module->ldb,
1123                                    ar->sub.mem_ctx,
1124                                    ar->objs->partition_dn,
1125                                    LDB_SCOPE_SUBTREE,
1126                                    filter,
1127                                    NULL,
1128                                    NULL,
1129                                    ar,
1130                                    replmd_replicated_apply_search_callback);
1131         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1132
1133 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1134         return ldb_next_request(ar->module, ar->sub.search_req);
1135 #else
1136         ret = ldb_next_request(ar->module, ar->sub.search_req);
1137         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1138
1139         ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1140         if (ar->sub.search_ret != LDB_SUCCESS) {
1141                 return replmd_replicated_request_error(ar, ar->sub.search_ret);
1142         }
1143         if (ar->sub.search_msg) {
1144                 return replmd_replicated_apply_merge(ar);
1145         }
1146
1147         return replmd_replicated_apply_add(ar);
1148 #endif
1149 }
1150
1151 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
1152 {
1153 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1154         if (ar->index_current >= ar->objs->num_objects) {
1155                 return replmd_replicated_uptodate_vector(ar);
1156         }
1157 #endif
1158
1159         ar->sub.mem_ctx = talloc_new(ar);
1160         if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1161
1162         return replmd_replicated_apply_search(ar);
1163 }
1164
1165 static int replmd_replicated_uptodate_modify_callback(struct ldb_context *ldb,
1166                                                       void *private_data,
1167                                                       struct ldb_reply *ares)
1168 {
1169 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1170         struct replmd_replicated_request *ar = talloc_get_type(private_data,
1171                                                struct replmd_replicated_request);
1172
1173         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1174         if (ar->sub.change_ret != LDB_SUCCESS) {
1175                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
1176         }
1177
1178         talloc_free(ar->sub.mem_ctx);
1179         ZERO_STRUCT(ar->sub);
1180
1181         return replmd_replicated_request_done(ar);
1182 #else
1183         return LDB_SUCCESS;
1184 #endif
1185 }
1186
1187 static int replmd_drsuapi_DsReplicaCursor2_compare(const struct drsuapi_DsReplicaCursor2 *c1,
1188                                                    const struct drsuapi_DsReplicaCursor2 *c2)
1189 {
1190         return GUID_compare(&c1->source_dsa_invocation_id, &c2->source_dsa_invocation_id);
1191 }
1192
1193 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1194 {
1195         enum ndr_err_code ndr_err;
1196         struct ldb_message *msg;
1197         struct replUpToDateVectorBlob ouv;
1198         const struct ldb_val *ouv_value;
1199         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1200         struct replUpToDateVectorBlob nuv;
1201         struct ldb_val nuv_value;
1202         struct ldb_message_element *nuv_el = NULL;
1203         const struct GUID *our_invocation_id;
1204         struct ldb_message_element *orf_el = NULL;
1205         struct repsFromToBlob nrf;
1206         struct ldb_val *nrf_value = NULL;
1207         struct ldb_message_element *nrf_el = NULL;
1208         uint32_t i,j,ni=0;
1209         uint64_t seq_num;
1210         bool found = false;
1211         time_t t = time(NULL);
1212         NTTIME now;
1213         int ret;
1214
1215         ruv = ar->objs->uptodateness_vector;
1216         ZERO_STRUCT(ouv);
1217         ouv.version = 2;
1218         ZERO_STRUCT(nuv);
1219         nuv.version = 2;
1220
1221         unix_to_nt_time(&now, t);
1222
1223         /* 
1224          * we use the next sequence number for our own highest_usn
1225          * because we will do a modify request and this will increment
1226          * our highest_usn
1227          */
1228         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
1229         if (ret != LDB_SUCCESS) {
1230                 return replmd_replicated_request_error(ar, ret);
1231         }
1232
1233         /*
1234          * first create the new replUpToDateVector
1235          */
1236         ouv_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replUpToDateVector");
1237         if (ouv_value) {
1238                 ndr_err = ndr_pull_struct_blob(ouv_value, ar->sub.mem_ctx, 
1239                                                lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")), &ouv,
1240                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1241                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1242                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1243                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1244                 }
1245
1246                 if (ouv.version != 2) {
1247                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1248                 }
1249         }
1250
1251         /*
1252          * the new uptodateness vector will at least
1253          * contain 1 entry, one for the source_dsa
1254          *
1255          * plus optional values from our old vector and the one from the source_dsa
1256          */
1257         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
1258         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1259         nuv.ctr.ctr2.cursors = talloc_array(ar->sub.mem_ctx,
1260                                             struct drsuapi_DsReplicaCursor2,
1261                                             nuv.ctr.ctr2.count);
1262         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1263
1264         /* first copy the old vector */
1265         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1266                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1267                 ni++;
1268         }
1269
1270         /* get our invocation_id if we have one already attached to the ldb */
1271         our_invocation_id = samdb_ntds_invocation_id(ar->module->ldb);
1272
1273         /* merge in the source_dsa vector is available */
1274         for (i=0; (ruv && i < ruv->count); i++) {
1275                 found = false;
1276
1277                 if (our_invocation_id &&
1278                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1279                                our_invocation_id)) {
1280                         continue;
1281                 }
1282
1283                 for (j=0; j < ni; j++) {
1284                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1285                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1286                                 continue;
1287                         }
1288
1289                         found = true;
1290
1291                         /*
1292                          * we update only the highest_usn and not the latest_sync_success time,
1293                          * because the last success stands for direct replication
1294                          */
1295                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1296                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1297                         }
1298                         break;                  
1299                 }
1300
1301                 if (found) continue;
1302
1303                 /* if it's not there yet, add it */
1304                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1305                 ni++;
1306         }
1307
1308         /*
1309          * merge in the current highwatermark for the source_dsa
1310          */
1311         found = false;
1312         for (j=0; j < ni; j++) {
1313                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1314                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1315                         continue;
1316                 }
1317
1318                 found = true;
1319
1320                 /*
1321                  * here we update the highest_usn and last_sync_success time
1322                  * because we're directly replicating from the source_dsa
1323                  *
1324                  * and use the tmp_highest_usn because this is what we have just applied
1325                  * to our ldb
1326                  */
1327                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1328                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1329                 break;
1330         }
1331         if (!found) {
1332                 /*
1333                  * here we update the highest_usn and last_sync_success time
1334                  * because we're directly replicating from the source_dsa
1335                  *
1336                  * and use the tmp_highest_usn because this is what we have just applied
1337                  * to our ldb
1338                  */
1339                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1340                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1341                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1342                 ni++;
1343         }
1344
1345         /*
1346          * finally correct the size of the cursors array
1347          */
1348         nuv.ctr.ctr2.count = ni;
1349
1350         /*
1351          * sort the cursors
1352          */
1353         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1354               sizeof(struct drsuapi_DsReplicaCursor2),
1355               (comparison_fn_t)replmd_drsuapi_DsReplicaCursor2_compare);
1356
1357         /*
1358          * create the change ldb_message
1359          */
1360         msg = ldb_msg_new(ar->sub.mem_ctx);
1361         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1362         msg->dn = ar->sub.search_msg->dn;
1363
1364         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
1365                                        lp_iconv_convenience(global_loadparm), 
1366                                        &nuv,
1367                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1368         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1369                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1370                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1371         }
1372         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1373         if (ret != LDB_SUCCESS) {
1374                 return replmd_replicated_request_error(ar, ret);
1375         }
1376         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1377
1378         /*
1379          * now create the new repsFrom value from the given repsFromTo1 structure
1380          */
1381         ZERO_STRUCT(nrf);
1382         nrf.version                                     = 1;
1383         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1384         /* and fix some values... */
1385         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1386         nrf.ctr.ctr1.last_success                       = now;
1387         nrf.ctr.ctr1.last_attempt                       = now;
1388         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1389         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1390
1391         /*
1392          * first see if we already have a repsFrom value for the current source dsa
1393          * if so we'll later replace this value
1394          */
1395         orf_el = ldb_msg_find_element(ar->sub.search_msg, "repsFrom");
1396         if (orf_el) {
1397                 for (i=0; i < orf_el->num_values; i++) {
1398                         struct repsFromToBlob *trf;
1399
1400                         trf = talloc(ar->sub.mem_ctx, struct repsFromToBlob);
1401                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1402
1403                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")), trf,
1404                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1405                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1406                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1407                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1408                         }
1409
1410                         if (trf->version != 1) {
1411                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1412                         }
1413
1414                         /*
1415                          * we compare the source dsa objectGUID not the invocation_id
1416                          * because we want only one repsFrom value per source dsa
1417                          * and when the invocation_id of the source dsa has changed we don't need 
1418                          * the old repsFrom with the old invocation_id
1419                          */
1420                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1421                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1422                                 talloc_free(trf);
1423                                 continue;
1424                         }
1425
1426                         talloc_free(trf);
1427                         nrf_value = &orf_el->values[i];
1428                         break;
1429                 }
1430
1431                 /*
1432                  * copy over all old values to the new ldb_message
1433                  */
1434                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1435                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1436                 *nrf_el = *orf_el;
1437         }
1438
1439         /*
1440          * if we haven't found an old repsFrom value for the current source dsa
1441          * we'll add a new value
1442          */
1443         if (!nrf_value) {
1444                 struct ldb_val zero_value;
1445                 ZERO_STRUCT(zero_value);
1446                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1447                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1448
1449                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1450         }
1451
1452         /* we now fill the value which is already attached to ldb_message */
1453         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
1454                                        lp_iconv_convenience(global_loadparm),
1455                                        &nrf,
1456                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1457         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1458                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1459                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1460         }
1461
1462         /* 
1463          * the ldb_message_element for the attribute, has all the old values and the new one
1464          * so we'll replace the whole attribute with all values
1465          */
1466         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1467
1468         /* prepare the ldb_modify() request */
1469         ret = ldb_build_mod_req(&ar->sub.change_req,
1470                                 ar->module->ldb,
1471                                 ar->sub.mem_ctx,
1472                                 msg,
1473                                 NULL,
1474                                 ar,
1475                                 replmd_replicated_uptodate_modify_callback);
1476         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1477
1478 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1479         return ldb_next_request(ar->module, ar->sub.change_req);
1480 #else
1481         ret = ldb_next_request(ar->module, ar->sub.change_req);
1482         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1483
1484         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1485         if (ar->sub.change_ret != LDB_SUCCESS) {
1486                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
1487         }
1488
1489         talloc_free(ar->sub.mem_ctx);
1490         ZERO_STRUCT(ar->sub);
1491
1492         return replmd_replicated_request_done(ar);
1493 #endif
1494 }
1495
1496 static int replmd_replicated_uptodate_search_callback(struct ldb_context *ldb,
1497                                                       void *private_data,
1498                                                       struct ldb_reply *ares)
1499 {
1500         struct replmd_replicated_request *ar = talloc_get_type(private_data,
1501                                                struct replmd_replicated_request);
1502         bool is_done = false;
1503
1504         switch (ares->type) {
1505         case LDB_REPLY_ENTRY:
1506                 ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
1507                 break;
1508         case LDB_REPLY_REFERRAL:
1509                 /* we ignore referrals */
1510                 break;
1511         case LDB_REPLY_EXTENDED:
1512         case LDB_REPLY_DONE:
1513                 is_done = true;
1514         }
1515
1516         talloc_free(ares);
1517
1518 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1519         if (is_done) {
1520                 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1521                 if (ar->sub.search_ret != LDB_SUCCESS) {
1522                         return replmd_replicated_request_error(ar, ar->sub.search_ret);
1523                 }
1524                 if (!ar->sub.search_msg) {
1525                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1526                 }
1527
1528                 return replmd_replicated_uptodate_modify(ar);
1529         }
1530 #endif
1531         return LDB_SUCCESS;
1532 }
1533
1534 static int replmd_replicated_uptodate_search(struct replmd_replicated_request *ar)
1535 {
1536         int ret;
1537         static const char *attrs[] = {
1538                 "replUpToDateVector",
1539                 "repsFrom",
1540                 NULL
1541         };
1542
1543         ret = ldb_build_search_req(&ar->sub.search_req,
1544                                    ar->module->ldb,
1545                                    ar->sub.mem_ctx,
1546                                    ar->objs->partition_dn,
1547                                    LDB_SCOPE_BASE,
1548                                    "(objectClass=*)",
1549                                    attrs,
1550                                    NULL,
1551                                    ar,
1552                                    replmd_replicated_uptodate_search_callback);
1553         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1554
1555 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1556         return ldb_next_request(ar->module, ar->sub.search_req);
1557 #else
1558         ret = ldb_next_request(ar->module, ar->sub.search_req);
1559         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1560
1561         ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1562         if (ar->sub.search_ret != LDB_SUCCESS) {
1563                 return replmd_replicated_request_error(ar, ar->sub.search_ret);
1564         }
1565         if (!ar->sub.search_msg) {
1566                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1567         }
1568
1569         return replmd_replicated_uptodate_modify(ar);
1570 #endif
1571 }
1572
1573 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1574 {
1575         ar->sub.mem_ctx = talloc_new(ar);
1576         if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1577
1578         return replmd_replicated_uptodate_search(ar);
1579 }
1580
1581 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1582 {
1583         struct dsdb_extended_replicated_objects *objs;
1584         struct replmd_replicated_request *ar;
1585
1586         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1587
1588         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1589         if (!objs) {
1590                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1591                 return LDB_ERR_PROTOCOL_ERROR;
1592         }
1593
1594         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1595                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1596                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1597                 return LDB_ERR_PROTOCOL_ERROR;
1598         }
1599
1600         ar = replmd_replicated_init_handle(module, req, objs);
1601         if (!ar) {
1602                 return LDB_ERR_OPERATIONS_ERROR;
1603         }
1604
1605 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1606         return replmd_replicated_apply_next(ar);
1607 #else
1608         while (ar->index_current < ar->objs->num_objects &&
1609                req->handle->state != LDB_ASYNC_DONE) { 
1610                 replmd_replicated_apply_next(ar);
1611         }
1612
1613         if (req->handle->state != LDB_ASYNC_DONE) {
1614                 replmd_replicated_uptodate_vector(ar);
1615         }
1616
1617         return LDB_SUCCESS;
1618 #endif
1619 }
1620
1621 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
1622 {
1623         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
1624                 return replmd_extended_replicated_objects(module, req);
1625         }
1626
1627         return ldb_next_request(module, req);
1628 }
1629
1630 static int replmd_wait_none(struct ldb_handle *handle) {
1631         struct replmd_replicated_request *ar;
1632     
1633         if (!handle || !handle->private_data) {
1634                 return LDB_ERR_OPERATIONS_ERROR;
1635         }
1636
1637         ar = talloc_get_type(handle->private_data, struct replmd_replicated_request);
1638         if (!ar) {
1639                 return LDB_ERR_OPERATIONS_ERROR;
1640         }
1641
1642         /* we do only sync calls */
1643         if (handle->state != LDB_ASYNC_DONE) {
1644                 return LDB_ERR_OPERATIONS_ERROR;
1645         }
1646
1647         return handle->status;
1648 }
1649
1650 static int replmd_wait_all(struct ldb_handle *handle) {
1651
1652         int ret;
1653
1654         while (handle->state != LDB_ASYNC_DONE) {
1655                 ret = replmd_wait_none(handle);
1656                 if (ret != LDB_SUCCESS) {
1657                         return ret;
1658                 }
1659         }
1660
1661         return handle->status;
1662 }
1663
1664 static int replmd_wait(struct ldb_handle *handle, enum ldb_wait_type type)
1665 {
1666         if (type == LDB_WAIT_ALL) {
1667                 return replmd_wait_all(handle);
1668         } else {
1669                 return replmd_wait_none(handle);
1670         }
1671 }
1672
1673 static const struct ldb_module_ops replmd_ops = {
1674         .name          = "repl_meta_data",
1675         .add           = replmd_add,
1676         .modify        = replmd_modify,
1677         .extended      = replmd_extended,
1678         .wait          = replmd_wait
1679 };
1680
1681 int repl_meta_data_module_init(void)
1682 {
1683         return ldb_register_module(&replmd_ops);
1684 }