Define GNU_SOURCE, required if libreplace doesn't provide comparison_fn_t,
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12    
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb
29  *
30  *  Component: ldb repl_meta_data module
31  *
32  *  Description: - add a unique objectGUID onto every new record,
33  *               - handle whenCreated, whenChanged timestamps
34  *               - handle uSNCreated, uSNChanged numbers
35  *               - handle replPropertyMetaData attribute
36  *
37  *  Author: Simo Sorce
38  *  Author: Stefan Metzmacher
39  */
40
41 #include "includes.h"
42 #include "lib/ldb/include/ldb.h"
43 #include "lib/ldb/include/ldb_errors.h"
44 #include "lib/ldb/include/ldb_private.h"
45 #include "dsdb/samdb/samdb.h"
46 #include "dsdb/common/flags.h"
47 #include "librpc/gen_ndr/ndr_misc.h"
48 #include "librpc/gen_ndr/ndr_drsuapi.h"
49 #include "librpc/gen_ndr/ndr_drsblobs.h"
50 #include "param/param.h"
51
52 struct replmd_replicated_request {
53         struct ldb_module *module;
54         struct ldb_request *req;
55
56         const struct dsdb_schema *schema;
57
58         struct dsdb_extended_replicated_objects *objs;
59
60         /* the controls we pass down */
61         struct ldb_control **controls;
62
63         uint32_t index_current;
64
65         struct ldb_message *search_msg;
66 };
67
68 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
69                                           struct ldb_request *req)
70 {
71         struct replmd_replicated_request *ac;
72
73         ac = talloc_zero(req, struct replmd_replicated_request);
74         if (ac == NULL) {
75                 ldb_oom(module->ldb);
76                 return NULL;
77         }
78
79         ac->module = module;
80         ac->req = req;
81         return ac;
82 }
83
84 /*
85   add a time element to a record
86 */
87 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
88 {
89         struct ldb_message_element *el;
90         char *s;
91
92         if (ldb_msg_find_element(msg, attr) != NULL) {
93                 return LDB_SUCCESS;
94         }
95
96         s = ldb_timestring(msg, t);
97         if (s == NULL) {
98                 return LDB_ERR_OPERATIONS_ERROR;
99         }
100
101         if (ldb_msg_add_string(msg, attr, s) != LDB_SUCCESS) {
102                 return LDB_ERR_OPERATIONS_ERROR;
103         }
104
105         el = ldb_msg_find_element(msg, attr);
106         /* always set as replace. This works because on add ops, the flag
107            is ignored */
108         el->flags = LDB_FLAG_MOD_REPLACE;
109
110         return LDB_SUCCESS;
111 }
112
113 /*
114   add a uint64_t element to a record
115 */
116 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
117 {
118         struct ldb_message_element *el;
119
120         if (ldb_msg_find_element(msg, attr) != NULL) {
121                 return LDB_SUCCESS;
122         }
123
124         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != LDB_SUCCESS) {
125                 return LDB_ERR_OPERATIONS_ERROR;
126         }
127
128         el = ldb_msg_find_element(msg, attr);
129         /* always set as replace. This works because on add ops, the flag
130            is ignored */
131         el->flags = LDB_FLAG_MOD_REPLACE;
132
133         return LDB_SUCCESS;
134 }
135
136 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
137                                                    const struct replPropertyMetaData1 *m2,
138                                                    const uint32_t *rdn_attid)
139 {
140         if (m1->attid == m2->attid) {
141                 return 0;
142         }
143
144         /*
145          * the rdn attribute should be at the end!
146          * so we need to return a value greater than zero
147          * which means m1 is greater than m2
148          */
149         if (m1->attid == *rdn_attid) {
150                 return 1;
151         }
152
153         /*
154          * the rdn attribute should be at the end!
155          * so we need to return a value less than zero
156          * which means m2 is greater than m1
157          */
158         if (m2->attid == *rdn_attid) {
159                 return -1;
160         }
161
162         return m1->attid - m2->attid;
163 }
164
165 static void replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
166                                                  const uint32_t *rdn_attid)
167 {
168         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
169                   discard_const_p(void, rdn_attid), (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
170 }
171
172 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
173                                                  const struct ldb_message_element *e2,
174                                                  const struct dsdb_schema *schema)
175 {
176         const struct dsdb_attribute *a1;
177         const struct dsdb_attribute *a2;
178
179         /* 
180          * TODO: make this faster by caching the dsdb_attribute pointer
181          *       on the ldb_messag_element
182          */
183
184         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
185         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
186
187         /*
188          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
189          *       in the schema
190          */
191         if (!a1 || !a2) {
192                 return strcasecmp(e1->name, e2->name);
193         }
194
195         return a1->attributeID_id - a2->attributeID_id;
196 }
197
198 static void replmd_ldb_message_sort(struct ldb_message *msg,
199                                     const struct dsdb_schema *schema)
200 {
201         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
202                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
203 }
204
205 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
206 {
207         struct replmd_replicated_request *ac;
208
209         ac = talloc_get_type(req->context, struct replmd_replicated_request);
210
211         if (!ares) {
212                 return ldb_module_done(ac->req, NULL, NULL,
213                                         LDB_ERR_OPERATIONS_ERROR);
214         }
215         if (ares->error != LDB_SUCCESS) {
216                 return ldb_module_done(ac->req, ares->controls,
217                                         ares->response, ares->error);
218         }
219
220         if (ares->type != LDB_REPLY_DONE) {
221                 ldb_set_errstring(ac->module->ldb,
222                                   "invalid ldb_reply_type in callback");
223                 talloc_free(ares);
224                 return ldb_module_done(ac->req, NULL, NULL,
225                                         LDB_ERR_OPERATIONS_ERROR);
226         }
227
228         return ldb_module_done(ac->req, ares->controls,
229                                 ares->response, LDB_SUCCESS);
230 }
231
232 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
233 {
234         struct replmd_replicated_request *ac;
235         const struct dsdb_schema *schema;
236         enum ndr_err_code ndr_err;
237         struct ldb_request *down_req;
238         struct ldb_message *msg;
239         const struct dsdb_attribute *rdn_attr = NULL;
240         struct GUID guid;
241         struct ldb_val guid_value;
242         struct replPropertyMetaDataBlob nmd;
243         struct ldb_val nmd_value;
244         uint64_t seq_num;
245         const struct GUID *our_invocation_id;
246         time_t t = time(NULL);
247         NTTIME now;
248         char *time_str;
249         int ret;
250         uint32_t i, ni=0;
251
252         /* do not manipulate our control entries */
253         if (ldb_dn_is_special(req->op.add.message->dn)) {
254                 return ldb_next_request(module, req);
255         }
256
257         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_add\n");
258
259         schema = dsdb_get_schema(module->ldb);
260         if (!schema) {
261                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
262                               "replmd_modify: no dsdb_schema loaded");
263                 return LDB_ERR_CONSTRAINT_VIOLATION;
264         }
265
266         ac = replmd_ctx_init(module, req);
267         if (!ac) {
268                 return LDB_ERR_OPERATIONS_ERROR;
269         }
270
271         ac->schema = schema;
272
273         if (ldb_msg_find_element(req->op.add.message, "objectGUID") != NULL) {
274                 ldb_debug_set(module->ldb, LDB_DEBUG_ERROR,
275                               "replmd_add: it's not allowed to add an object with objectGUID\n");
276                 return LDB_ERR_UNWILLING_TO_PERFORM;
277         }
278
279         /* Get a sequence number from the backend */
280         ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
281         if (ret != LDB_SUCCESS) {
282                 return ret;
283         }
284
285         /* a new GUID */
286         guid = GUID_random();
287
288         /* get our invicationId */
289         our_invocation_id = samdb_ntds_invocation_id(module->ldb);
290         if (!our_invocation_id) {
291                 ldb_debug_set(module->ldb, LDB_DEBUG_ERROR,
292                               "replmd_add: unable to find invocationId\n");
293                 return LDB_ERR_OPERATIONS_ERROR;
294         }
295
296         /* we have to copy the message as the caller might have it as a const */
297         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
298         if (msg == NULL) {
299                 ldb_oom(module->ldb);
300                 return LDB_ERR_OPERATIONS_ERROR;
301         }
302
303         /* generated times */
304         unix_to_nt_time(&now, t);
305         time_str = ldb_timestring(msg, t);
306         if (!time_str) {
307                 return LDB_ERR_OPERATIONS_ERROR;
308         }
309
310         /* 
311          * remove autogenerated attributes
312          */
313         ldb_msg_remove_attr(msg, "whenCreated");
314         ldb_msg_remove_attr(msg, "whenChanged");
315         ldb_msg_remove_attr(msg, "uSNCreated");
316         ldb_msg_remove_attr(msg, "uSNChanged");
317         ldb_msg_remove_attr(msg, "replPropertyMetaData");
318
319         /*
320          * readd replicated attributes
321          */
322         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
323         if (ret != LDB_SUCCESS) {
324                 ldb_oom(module->ldb);
325                 return LDB_ERR_OPERATIONS_ERROR;
326         }
327
328         /* build the replication meta_data */
329         ZERO_STRUCT(nmd);
330         nmd.version             = 1;
331         nmd.ctr.ctr1.count      = msg->num_elements;
332         nmd.ctr.ctr1.array      = talloc_array(msg,
333                                                struct replPropertyMetaData1,
334                                                nmd.ctr.ctr1.count);
335         if (!nmd.ctr.ctr1.array) {
336                 ldb_oom(module->ldb);
337                 return LDB_ERR_OPERATIONS_ERROR;
338         }
339
340         for (i=0; i < msg->num_elements; i++) {
341                 struct ldb_message_element *e = &msg->elements[i];
342                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
343                 const struct dsdb_attribute *sa;
344
345                 if (e->name[0] == '@') continue;
346
347                 sa = dsdb_attribute_by_lDAPDisplayName(schema, e->name);
348                 if (!sa) {
349                         ldb_debug_set(module->ldb, LDB_DEBUG_ERROR,
350                                       "replmd_add: attribute '%s' not defined in schema\n",
351                                       e->name);
352                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
353                 }
354
355                 if ((sa->systemFlags & 0x00000001) || (sa->systemFlags & 0x00000004)) {
356                         /* if the attribute is not replicated (0x00000001)
357                          * or constructed (0x00000004) it has no metadata
358                          */
359                         continue;
360                 }
361
362                 m->attid                        = sa->attributeID_id;
363                 m->version                      = 1;
364                 m->originating_change_time      = now;
365                 m->originating_invocation_id    = *our_invocation_id;
366                 m->originating_usn              = seq_num;
367                 m->local_usn                    = seq_num;
368                 ni++;
369
370                 if (ldb_attr_cmp(e->name, ldb_dn_get_rdn_name(msg->dn))) {
371                         rdn_attr = sa;
372                 }
373         }
374
375         /* fix meta data count */
376         nmd.ctr.ctr1.count = ni;
377
378         /*
379          * sort meta data array, and move the rdn attribute entry to the end
380          */
381         replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_attr->attributeID_id);
382
383         /* generated NDR encoded values */
384         ndr_err = ndr_push_struct_blob(&guid_value, msg, 
385                                        NULL,
386                                        &guid,
387                                        (ndr_push_flags_fn_t)ndr_push_GUID);
388         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
389                 ldb_oom(module->ldb);
390                 return LDB_ERR_OPERATIONS_ERROR;
391         }
392         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
393                                        lp_iconv_convenience(ldb_get_opaque(module->ldb, "loadparm")),
394                                        &nmd,
395                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
396         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
397                 ldb_oom(module->ldb);
398                 return LDB_ERR_OPERATIONS_ERROR;
399         }
400
401         /*
402          * add the autogenerated values
403          */
404         ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
405         if (ret != LDB_SUCCESS) {
406                 ldb_oom(module->ldb);
407                 return LDB_ERR_OPERATIONS_ERROR;
408         }
409         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
410         if (ret != LDB_SUCCESS) {
411                 ldb_oom(module->ldb);
412                 return LDB_ERR_OPERATIONS_ERROR;
413         }
414         ret = samdb_msg_add_uint64(module->ldb, msg, msg, "uSNCreated", seq_num);
415         if (ret != LDB_SUCCESS) {
416                 ldb_oom(module->ldb);
417                 return LDB_ERR_OPERATIONS_ERROR;
418         }
419         ret = samdb_msg_add_uint64(module->ldb, msg, msg, "uSNChanged", seq_num);
420         if (ret != LDB_SUCCESS) {
421                 ldb_oom(module->ldb);
422                 return LDB_ERR_OPERATIONS_ERROR;
423         }
424         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
425         if (ret != LDB_SUCCESS) {
426                 ldb_oom(module->ldb);
427                 return LDB_ERR_OPERATIONS_ERROR;
428         }
429
430         /*
431          * sort the attributes by attid before storing the object
432          */
433         replmd_ldb_message_sort(msg, schema);
434
435         ret = ldb_build_add_req(&down_req, module->ldb, ac,
436                                 msg,
437                                 req->controls,
438                                 ac, replmd_op_callback,
439                                 req);
440         if (ret != LDB_SUCCESS) {
441                 return ret;
442         }
443
444         /* go on with the call chain */
445         return ldb_next_request(module, down_req);
446 }
447
448 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
449 {
450         struct replmd_replicated_request *ac;
451         const struct dsdb_schema *schema;
452         struct ldb_request *down_req;
453         struct ldb_message *msg;
454         int ret;
455         time_t t = time(NULL);
456         uint64_t seq_num;
457
458         /* do not manipulate our control entries */
459         if (ldb_dn_is_special(req->op.mod.message->dn)) {
460                 return ldb_next_request(module, req);
461         }
462
463         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
464
465         schema = dsdb_get_schema(module->ldb);
466         if (!schema) {
467                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
468                               "replmd_modify: no dsdb_schema loaded");
469                 return LDB_ERR_CONSTRAINT_VIOLATION;
470         }
471
472         ac = replmd_ctx_init(module, req);
473         if (!ac) {
474                 return LDB_ERR_OPERATIONS_ERROR;
475         }
476
477         ac->schema = schema;
478
479         /* we have to copy the message as the caller might have it as a const */
480         msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
481         if (msg == NULL) {
482                 talloc_free(ac);
483                 return LDB_ERR_OPERATIONS_ERROR;
484         }
485
486         /* TODO:
487          * - get the whole old object
488          * - if the old object doesn't exist report an error
489          * - give an error when a readonly attribute should
490          *   be modified
491          * - merge the changed into the old object
492          *   if the caller set values to the same value
493          *   ignore the attribute, return success when no
494          *   attribute was changed
495          * - calculate the new replPropertyMetaData attribute
496          */
497
498         if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
499                 talloc_free(ac);
500                 return LDB_ERR_OPERATIONS_ERROR;
501         }
502
503         /* Get a sequence number from the backend */
504         ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
505         if (ret == LDB_SUCCESS) {
506                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
507                         talloc_free(ac);
508                         return LDB_ERR_OPERATIONS_ERROR;
509                 }
510         }
511
512         /* TODO:
513          * - sort the attributes by attid with replmd_ldb_message_sort()
514          * - replace the old object with the newly constructed one
515          */
516
517         ret = ldb_build_mod_req(&down_req, module->ldb, ac,
518                                 msg,
519                                 req->controls,
520                                 ac, replmd_op_callback,
521                                 req);
522         if (ret != LDB_SUCCESS) {
523                 return ret;
524         }
525         talloc_steal(down_req, msg);
526
527         /* go on with the call chain */
528         return ldb_next_request(module, down_req);
529 }
530
531 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
532 {
533         return ret;
534 }
535
536 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
537 {
538         int ret = LDB_ERR_OTHER;
539         /* TODO: do some error mapping */
540         return ret;
541 }
542
543 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
544
545 static int replmd_replicated_apply_add_callback(struct ldb_request *req,
546                                                 struct ldb_reply *ares)
547 {
548         struct replmd_replicated_request *ar = talloc_get_type(req->context,
549                                                struct replmd_replicated_request);
550         int ret;
551
552
553         if (!ares) {
554                 return ldb_module_done(ar->req, NULL, NULL,
555                                         LDB_ERR_OPERATIONS_ERROR);
556         }
557         if (ares->error != LDB_SUCCESS) {
558                 return ldb_module_done(ar->req, ares->controls,
559                                         ares->response, ares->error);
560         }
561
562         if (ares->type != LDB_REPLY_DONE) {
563                 ldb_set_errstring(ar->module->ldb, "Invalid reply type\n!");
564                 return ldb_module_done(ar->req, NULL, NULL,
565                                         LDB_ERR_OPERATIONS_ERROR);
566         }
567
568         talloc_free(ares);
569         ar->index_current++;
570
571         ret = replmd_replicated_apply_next(ar);
572         if (ret != LDB_SUCCESS) {
573                 return ldb_module_done(ar->req, NULL, NULL, ret);
574         }
575
576         return LDB_SUCCESS;
577 }
578
579 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
580 {
581         struct ldb_request *change_req;
582         enum ndr_err_code ndr_err;
583         struct ldb_message *msg;
584         struct replPropertyMetaDataBlob *md;
585         struct ldb_val md_value;
586         uint32_t i;
587         uint64_t seq_num;
588         int ret;
589
590         /*
591          * TODO: check if the parent object exist
592          */
593
594         /*
595          * TODO: handle the conflict case where an object with the
596          *       same name exist
597          */
598
599         msg = ar->objs->objects[ar->index_current].msg;
600         md = ar->objs->objects[ar->index_current].meta_data;
601
602         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
603         if (ret != LDB_SUCCESS) {
604                 return replmd_replicated_request_error(ar, ret);
605         }
606
607         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
608         if (ret != LDB_SUCCESS) {
609                 return replmd_replicated_request_error(ar, ret);
610         }
611
612         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
613         if (ret != LDB_SUCCESS) {
614                 return replmd_replicated_request_error(ar, ret);
615         }
616
617         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNCreated", seq_num);
618         if (ret != LDB_SUCCESS) {
619                 return replmd_replicated_request_error(ar, ret);
620         }
621
622         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
623         if (ret != LDB_SUCCESS) {
624                 return replmd_replicated_request_error(ar, ret);
625         }
626
627         /*
628          * the meta data array is already sorted by the caller
629          */
630         for (i=0; i < md->ctr.ctr1.count; i++) {
631                 md->ctr.ctr1.array[i].local_usn = seq_num;
632         }
633         ndr_err = ndr_push_struct_blob(&md_value, msg, 
634                                        lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")),
635                                        md,
636                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
637         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
638                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
639                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
640         }
641         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
642         if (ret != LDB_SUCCESS) {
643                 return replmd_replicated_request_error(ar, ret);
644         }
645
646         replmd_ldb_message_sort(msg, ar->schema);
647
648         ret = ldb_build_add_req(&change_req,
649                                 ar->module->ldb,
650                                 ar,
651                                 msg,
652                                 ar->controls,
653                                 ar,
654                                 replmd_replicated_apply_add_callback,
655                                 ar->req);
656         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
657
658         return ldb_next_request(ar->module, change_req);
659 }
660
661 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
662                                                          struct replPropertyMetaData1 *m2)
663 {
664         int ret;
665
666         if (m1->version != m2->version) {
667                 return m1->version - m2->version;
668         }
669
670         if (m1->originating_change_time != m2->originating_change_time) {
671                 return m1->originating_change_time - m2->originating_change_time;
672         }
673
674         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
675         if (ret != 0) {
676                 return ret;
677         }
678
679         return m1->originating_usn - m2->originating_usn;
680 }
681
682 static int replmd_replicated_apply_merge_callback(struct ldb_request *req,
683                                                   struct ldb_reply *ares)
684 {
685         struct replmd_replicated_request *ar = talloc_get_type(req->context,
686                                                struct replmd_replicated_request);
687         int ret;
688
689         if (!ares) {
690                 return ldb_module_done(ar->req, NULL, NULL,
691                                         LDB_ERR_OPERATIONS_ERROR);
692         }
693         if (ares->error != LDB_SUCCESS) {
694                 return ldb_module_done(ar->req, ares->controls,
695                                         ares->response, ares->error);
696         }
697
698         if (ares->type != LDB_REPLY_DONE) {
699                 ldb_set_errstring(ar->module->ldb, "Invalid reply type\n!");
700                 return ldb_module_done(ar->req, NULL, NULL,
701                                         LDB_ERR_OPERATIONS_ERROR);
702         }
703
704         talloc_free(ares);
705         ar->index_current++;
706
707         ret = replmd_replicated_apply_next(ar);
708         if (ret != LDB_SUCCESS) {
709                 return ldb_module_done(ar->req, NULL, NULL, ret);
710         }
711
712         return LDB_SUCCESS;
713 }
714
715 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
716 {
717         struct ldb_request *change_req;
718         enum ndr_err_code ndr_err;
719         struct ldb_message *msg;
720         struct replPropertyMetaDataBlob *rmd;
721         struct replPropertyMetaDataBlob omd;
722         const struct ldb_val *omd_value;
723         struct replPropertyMetaDataBlob nmd;
724         struct ldb_val nmd_value;
725         uint32_t i,j,ni=0;
726         uint32_t removed_attrs = 0;
727         uint64_t seq_num;
728         int ret;
729
730         msg = ar->objs->objects[ar->index_current].msg;
731         rmd = ar->objs->objects[ar->index_current].meta_data;
732         ZERO_STRUCT(omd);
733         omd.version = 1;
734
735         /*
736          * TODO: add rename conflict handling
737          */
738         if (ldb_dn_compare(msg->dn, ar->search_msg->dn) != 0) {
739                 ldb_debug_set(ar->module->ldb, LDB_DEBUG_FATAL, "replmd_replicated_apply_merge[%u]: rename not supported",
740                               ar->index_current);
741                 ldb_debug(ar->module->ldb, LDB_DEBUG_FATAL, "%s => %s\n",
742                           ldb_dn_get_linearized(ar->search_msg->dn),
743                           ldb_dn_get_linearized(msg->dn));
744                 return replmd_replicated_request_werror(ar, WERR_NOT_SUPPORTED);
745         }
746
747         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
748         if (ret != LDB_SUCCESS) {
749                 return replmd_replicated_request_error(ar, ret);
750         }
751
752         /* find existing meta data */
753         omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
754         if (omd_value) {
755                 ndr_err = ndr_pull_struct_blob(omd_value, ar,
756                                                lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")), &omd,
757                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
758                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
759                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
760                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
761                 }
762
763                 if (omd.version != 1) {
764                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
765                 }
766         }
767
768         ZERO_STRUCT(nmd);
769         nmd.version = 1;
770         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
771         nmd.ctr.ctr1.array = talloc_array(ar,
772                                           struct replPropertyMetaData1,
773                                           nmd.ctr.ctr1.count);
774         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
775
776         /* first copy the old meta data */
777         for (i=0; i < omd.ctr.ctr1.count; i++) {
778                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
779                 ni++;
780         }
781
782         /* now merge in the new meta data */
783         for (i=0; i < rmd->ctr.ctr1.count; i++) {
784                 bool found = false;
785
786                 rmd->ctr.ctr1.array[i].local_usn = seq_num;
787
788                 for (j=0; j < ni; j++) {
789                         int cmp;
790
791                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
792                                 continue;
793                         }
794
795                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
796                                                                             &nmd.ctr.ctr1.array[j]);
797                         if (cmp > 0) {
798                                 /* replace the entry */
799                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
800                                 found = true;
801                                 break;
802                         }
803
804                         /* we don't want to apply this change so remove the attribute */
805                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
806                         removed_attrs++;
807
808                         found = true;
809                         break;
810                 }
811
812                 if (found) continue;
813
814                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
815                 ni++;
816         }
817
818         /*
819          * finally correct the size of the meta_data array
820          */
821         nmd.ctr.ctr1.count = ni;
822
823         /*
824          * the rdn attribute (the alias for the name attribute),
825          * 'cn' for most objects is the last entry in the meta data array
826          * we have stored
827          *
828          * sort the new meta data array
829          */
830         {
831                 struct replPropertyMetaData1 *rdn_p;
832                 uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
833
834                 rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
835                 replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_p->attid);
836         }
837
838         /* create the meta data value */
839         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
840                                        lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")),
841                                        &nmd,
842                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
843         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
844                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
845                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
846         }
847
848         /*
849          * check if some replicated attributes left, otherwise skip the ldb_modify() call
850          */
851         if (msg->num_elements == 0) {
852                 ldb_debug(ar->module->ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
853                           ar->index_current);
854
855                 ar->index_current++;
856                 return replmd_replicated_apply_next(ar);
857         }
858
859         ldb_debug(ar->module->ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
860                   ar->index_current, msg->num_elements);
861
862         /*
863          * when we know that we'll modify the record, add the whenChanged, uSNChanged
864          * and replPopertyMetaData attributes
865          */
866         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
867         if (ret != LDB_SUCCESS) {
868                 return replmd_replicated_request_error(ar, ret);
869         }
870         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
871         if (ret != LDB_SUCCESS) {
872                 return replmd_replicated_request_error(ar, ret);
873         }
874         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
875         if (ret != LDB_SUCCESS) {
876                 return replmd_replicated_request_error(ar, ret);
877         }
878
879         replmd_ldb_message_sort(msg, ar->schema);
880
881         /* we want to replace the old values */
882         for (i=0; i < msg->num_elements; i++) {
883                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
884         }
885
886         ret = ldb_build_mod_req(&change_req,
887                                 ar->module->ldb,
888                                 ar,
889                                 msg,
890                                 ar->controls,
891                                 ar,
892                                 replmd_replicated_apply_merge_callback,
893                                 ar->req);
894         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
895
896         return ldb_next_request(ar->module, change_req);
897 }
898
899 static int replmd_replicated_apply_search_callback(struct ldb_request *req,
900                                                    struct ldb_reply *ares)
901 {
902         struct replmd_replicated_request *ar = talloc_get_type(req->context,
903                                                struct replmd_replicated_request);
904         int ret;
905
906         if (!ares) {
907                 return ldb_module_done(ar->req, NULL, NULL,
908                                         LDB_ERR_OPERATIONS_ERROR);
909         }
910         if (ares->error != LDB_SUCCESS &&
911             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
912                 return ldb_module_done(ar->req, ares->controls,
913                                         ares->response, ares->error);
914         }
915
916         switch (ares->type) {
917         case LDB_REPLY_ENTRY:
918                 ar->search_msg = talloc_steal(ar, ares->message);
919                 break;
920
921         case LDB_REPLY_REFERRAL:
922                 /* we ignore referrals */
923                 break;
924
925         case LDB_REPLY_DONE:
926                 if (ar->search_msg != NULL) {
927                         ret = replmd_replicated_apply_merge(ar);
928                 } else {
929                         ret = replmd_replicated_apply_add(ar);
930                 }
931                 if (ret != LDB_SUCCESS) {
932                         return ldb_module_done(ar->req, NULL, NULL, ret);
933                 }
934         }
935
936         talloc_free(ares);
937         return LDB_SUCCESS;
938 }
939
940 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
941
942 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
943 {
944         int ret;
945         char *tmp_str;
946         char *filter;
947         struct ldb_request *search_req;
948
949         if (ar->index_current >= ar->objs->num_objects) {
950                 /* done with it, go to the last op */
951                 return replmd_replicated_uptodate_vector(ar);
952         }
953
954         ar->search_msg = NULL;
955
956         tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].guid_value);
957         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
958
959         filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
960         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
961         talloc_free(tmp_str);
962
963         ret = ldb_build_search_req(&search_req,
964                                    ar->module->ldb,
965                                    ar,
966                                    ar->objs->partition_dn,
967                                    LDB_SCOPE_SUBTREE,
968                                    filter,
969                                    NULL,
970                                    NULL,
971                                    ar,
972                                    replmd_replicated_apply_search_callback,
973                                    ar->req);
974         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
975
976         return ldb_next_request(ar->module, search_req);
977 }
978
979 static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
980                                                       struct ldb_reply *ares)
981 {
982         struct replmd_replicated_request *ar = talloc_get_type(req->context,
983                                                struct replmd_replicated_request);
984
985         if (!ares) {
986                 return ldb_module_done(ar->req, NULL, NULL,
987                                         LDB_ERR_OPERATIONS_ERROR);
988         }
989         if (ares->error != LDB_SUCCESS) {
990                 return ldb_module_done(ar->req, ares->controls,
991                                         ares->response, ares->error);
992         }
993
994         if (ares->type != LDB_REPLY_DONE) {
995                 ldb_set_errstring(ar->module->ldb, "Invalid reply type\n!");
996                 return ldb_module_done(ar->req, NULL, NULL,
997                                         LDB_ERR_OPERATIONS_ERROR);
998         }
999
1000         talloc_free(ares);
1001
1002         return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
1003 }
1004
1005 static int replmd_drsuapi_DsReplicaCursor2_compare(const struct drsuapi_DsReplicaCursor2 *c1,
1006                                                    const struct drsuapi_DsReplicaCursor2 *c2)
1007 {
1008         return GUID_compare(&c1->source_dsa_invocation_id, &c2->source_dsa_invocation_id);
1009 }
1010
1011 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1012 {
1013         struct ldb_request *change_req;
1014         enum ndr_err_code ndr_err;
1015         struct ldb_message *msg;
1016         struct replUpToDateVectorBlob ouv;
1017         const struct ldb_val *ouv_value;
1018         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1019         struct replUpToDateVectorBlob nuv;
1020         struct ldb_val nuv_value;
1021         struct ldb_message_element *nuv_el = NULL;
1022         const struct GUID *our_invocation_id;
1023         struct ldb_message_element *orf_el = NULL;
1024         struct repsFromToBlob nrf;
1025         struct ldb_val *nrf_value = NULL;
1026         struct ldb_message_element *nrf_el = NULL;
1027         uint32_t i,j,ni=0;
1028         uint64_t seq_num;
1029         bool found = false;
1030         time_t t = time(NULL);
1031         NTTIME now;
1032         int ret;
1033
1034         ruv = ar->objs->uptodateness_vector;
1035         ZERO_STRUCT(ouv);
1036         ouv.version = 2;
1037         ZERO_STRUCT(nuv);
1038         nuv.version = 2;
1039
1040         unix_to_nt_time(&now, t);
1041
1042         /* 
1043          * we use the next sequence number for our own highest_usn
1044          * because we will do a modify request and this will increment
1045          * our highest_usn
1046          */
1047         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
1048         if (ret != LDB_SUCCESS) {
1049                 return replmd_replicated_request_error(ar, ret);
1050         }
1051
1052         /*
1053          * first create the new replUpToDateVector
1054          */
1055         ouv_value = ldb_msg_find_ldb_val(ar->search_msg, "replUpToDateVector");
1056         if (ouv_value) {
1057                 ndr_err = ndr_pull_struct_blob(ouv_value, ar,
1058                                                lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")), &ouv,
1059                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1060                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1061                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1062                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1063                 }
1064
1065                 if (ouv.version != 2) {
1066                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1067                 }
1068         }
1069
1070         /*
1071          * the new uptodateness vector will at least
1072          * contain 1 entry, one for the source_dsa
1073          *
1074          * plus optional values from our old vector and the one from the source_dsa
1075          */
1076         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
1077         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1078         nuv.ctr.ctr2.cursors = talloc_array(ar,
1079                                             struct drsuapi_DsReplicaCursor2,
1080                                             nuv.ctr.ctr2.count);
1081         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1082
1083         /* first copy the old vector */
1084         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1085                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1086                 ni++;
1087         }
1088
1089         /* get our invocation_id if we have one already attached to the ldb */
1090         our_invocation_id = samdb_ntds_invocation_id(ar->module->ldb);
1091
1092         /* merge in the source_dsa vector is available */
1093         for (i=0; (ruv && i < ruv->count); i++) {
1094                 found = false;
1095
1096                 if (our_invocation_id &&
1097                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1098                                our_invocation_id)) {
1099                         continue;
1100                 }
1101
1102                 for (j=0; j < ni; j++) {
1103                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1104                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1105                                 continue;
1106                         }
1107
1108                         found = true;
1109
1110                         /*
1111                          * we update only the highest_usn and not the latest_sync_success time,
1112                          * because the last success stands for direct replication
1113                          */
1114                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1115                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1116                         }
1117                         break;                  
1118                 }
1119
1120                 if (found) continue;
1121
1122                 /* if it's not there yet, add it */
1123                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1124                 ni++;
1125         }
1126
1127         /*
1128          * merge in the current highwatermark for the source_dsa
1129          */
1130         found = false;
1131         for (j=0; j < ni; j++) {
1132                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1133                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1134                         continue;
1135                 }
1136
1137                 found = true;
1138
1139                 /*
1140                  * here we update the highest_usn and last_sync_success time
1141                  * because we're directly replicating from the source_dsa
1142                  *
1143                  * and use the tmp_highest_usn because this is what we have just applied
1144                  * to our ldb
1145                  */
1146                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1147                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1148                 break;
1149         }
1150         if (!found) {
1151                 /*
1152                  * here we update the highest_usn and last_sync_success time
1153                  * because we're directly replicating from the source_dsa
1154                  *
1155                  * and use the tmp_highest_usn because this is what we have just applied
1156                  * to our ldb
1157                  */
1158                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1159                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1160                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1161                 ni++;
1162         }
1163
1164         /*
1165          * finally correct the size of the cursors array
1166          */
1167         nuv.ctr.ctr2.count = ni;
1168
1169         /*
1170          * sort the cursors
1171          */
1172         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1173               sizeof(struct drsuapi_DsReplicaCursor2),
1174               (comparison_fn_t)replmd_drsuapi_DsReplicaCursor2_compare);
1175
1176         /*
1177          * create the change ldb_message
1178          */
1179         msg = ldb_msg_new(ar);
1180         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1181         msg->dn = ar->search_msg->dn;
1182
1183         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
1184                                        lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")), 
1185                                        &nuv,
1186                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1187         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1188                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1189                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1190         }
1191         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1192         if (ret != LDB_SUCCESS) {
1193                 return replmd_replicated_request_error(ar, ret);
1194         }
1195         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1196
1197         /*
1198          * now create the new repsFrom value from the given repsFromTo1 structure
1199          */
1200         ZERO_STRUCT(nrf);
1201         nrf.version                                     = 1;
1202         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1203         /* and fix some values... */
1204         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1205         nrf.ctr.ctr1.last_success                       = now;
1206         nrf.ctr.ctr1.last_attempt                       = now;
1207         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1208         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1209
1210         /*
1211          * first see if we already have a repsFrom value for the current source dsa
1212          * if so we'll later replace this value
1213          */
1214         orf_el = ldb_msg_find_element(ar->search_msg, "repsFrom");
1215         if (orf_el) {
1216                 for (i=0; i < orf_el->num_values; i++) {
1217                         struct repsFromToBlob *trf;
1218
1219                         trf = talloc(ar, struct repsFromToBlob);
1220                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1221
1222                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")), trf,
1223                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1224                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1225                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1226                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1227                         }
1228
1229                         if (trf->version != 1) {
1230                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1231                         }
1232
1233                         /*
1234                          * we compare the source dsa objectGUID not the invocation_id
1235                          * because we want only one repsFrom value per source dsa
1236                          * and when the invocation_id of the source dsa has changed we don't need 
1237                          * the old repsFrom with the old invocation_id
1238                          */
1239                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1240                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1241                                 talloc_free(trf);
1242                                 continue;
1243                         }
1244
1245                         talloc_free(trf);
1246                         nrf_value = &orf_el->values[i];
1247                         break;
1248                 }
1249
1250                 /*
1251                  * copy over all old values to the new ldb_message
1252                  */
1253                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1254                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1255                 *nrf_el = *orf_el;
1256         }
1257
1258         /*
1259          * if we haven't found an old repsFrom value for the current source dsa
1260          * we'll add a new value
1261          */
1262         if (!nrf_value) {
1263                 struct ldb_val zero_value;
1264                 ZERO_STRUCT(zero_value);
1265                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1266                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1267
1268                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1269         }
1270
1271         /* we now fill the value which is already attached to ldb_message */
1272         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
1273                                        lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")),
1274                                        &nrf,
1275                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1276         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1277                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1278                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1279         }
1280
1281         /* 
1282          * the ldb_message_element for the attribute, has all the old values and the new one
1283          * so we'll replace the whole attribute with all values
1284          */
1285         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1286
1287         /* prepare the ldb_modify() request */
1288         ret = ldb_build_mod_req(&change_req,
1289                                 ar->module->ldb,
1290                                 ar,
1291                                 msg,
1292                                 ar->controls,
1293                                 ar,
1294                                 replmd_replicated_uptodate_modify_callback,
1295                                 ar->req);
1296         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1297
1298         return ldb_next_request(ar->module, change_req);
1299 }
1300
1301 static int replmd_replicated_uptodate_search_callback(struct ldb_request *req,
1302                                                       struct ldb_reply *ares)
1303 {
1304         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1305                                                struct replmd_replicated_request);
1306         int ret;
1307
1308         if (!ares) {
1309                 return ldb_module_done(ar->req, NULL, NULL,
1310                                         LDB_ERR_OPERATIONS_ERROR);
1311         }
1312         if (ares->error != LDB_SUCCESS &&
1313             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1314                 return ldb_module_done(ar->req, ares->controls,
1315                                         ares->response, ares->error);
1316         }
1317
1318         switch (ares->type) {
1319         case LDB_REPLY_ENTRY:
1320                 ar->search_msg = talloc_steal(ar, ares->message);
1321                 break;
1322
1323         case LDB_REPLY_REFERRAL:
1324                 /* we ignore referrals */
1325                 break;
1326
1327         case LDB_REPLY_DONE:
1328                 if (ar->search_msg == NULL) {
1329                         ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1330                 } else {
1331                         ret = replmd_replicated_uptodate_modify(ar);
1332                 }
1333                 if (ret != LDB_SUCCESS) {
1334                         return ldb_module_done(ar->req, NULL, NULL, ret);
1335                 }
1336         }
1337
1338         talloc_free(ares);
1339         return LDB_SUCCESS;
1340 }
1341
1342
1343 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1344 {
1345         int ret;
1346         static const char *attrs[] = {
1347                 "replUpToDateVector",
1348                 "repsFrom",
1349                 NULL
1350         };
1351         struct ldb_request *search_req;
1352
1353         ar->search_msg = NULL;
1354
1355         ret = ldb_build_search_req(&search_req,
1356                                    ar->module->ldb,
1357                                    ar,
1358                                    ar->objs->partition_dn,
1359                                    LDB_SCOPE_BASE,
1360                                    "(objectClass=*)",
1361                                    attrs,
1362                                    NULL,
1363                                    ar,
1364                                    replmd_replicated_uptodate_search_callback,
1365                                    ar->req);
1366         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1367
1368         return ldb_next_request(ar->module, search_req);
1369 }
1370
1371 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1372 {
1373         struct dsdb_extended_replicated_objects *objs;
1374         struct replmd_replicated_request *ar;
1375         struct ldb_control **ctrls;
1376         int ret;
1377
1378         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1379
1380         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1381         if (!objs) {
1382                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1383                 return LDB_ERR_PROTOCOL_ERROR;
1384         }
1385
1386         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1387                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1388                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1389                 return LDB_ERR_PROTOCOL_ERROR;
1390         }
1391
1392         ar = replmd_ctx_init(module, req);
1393         if (!ar)
1394                 return LDB_ERR_OPERATIONS_ERROR;
1395
1396         ar->objs = objs;
1397         ar->schema = dsdb_get_schema(module->ldb);
1398         if (!ar->schema) {
1399                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL, "replmd_ctx_init: no loaded schema found\n");
1400                 talloc_free(ar);
1401                 return LDB_ERR_CONSTRAINT_VIOLATION;
1402         }
1403
1404         ctrls = req->controls;
1405
1406         if (req->controls) {
1407                 req->controls = talloc_memdup(ar, req->controls,
1408                                               talloc_get_size(req->controls));
1409                 if (!req->controls) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1410         }
1411
1412         ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, NULL);
1413         if (ret != LDB_SUCCESS) {
1414                 return ret;
1415         }
1416
1417         ar->controls = req->controls;
1418         req->controls = ctrls;
1419
1420         return replmd_replicated_apply_next(ar);
1421 }
1422
1423 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
1424 {
1425         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
1426                 return replmd_extended_replicated_objects(module, req);
1427         }
1428
1429         return ldb_next_request(module, req);
1430 }
1431
1432 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
1433         .name          = "repl_meta_data",
1434         .add           = replmd_add,
1435         .modify        = replmd_modify,
1436         .extended      = replmd_extended,
1437 };