s4-repl: make sure we marshal the replPropertyMetaData after the last change
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /*
24  *  Name: ldb
25  *
26  *  Component: ldb repl_meta_data module
27  *
28  *  Description: - add a unique objectGUID onto every new record,
29  *               - handle whenCreated, whenChanged timestamps
30  *               - handle uSNCreated, uSNChanged numbers
31  *               - handle replPropertyMetaData attribute
32  *
33  *  Author: Simo Sorce
34  *  Author: Stefan Metzmacher
35  */
36
37 #include "includes.h"
38 #include "ldb_module.h"
39 #include "dsdb/samdb/samdb.h"
40 #include "dsdb/common/proto.h"
41 #include "../libds/common/flags.h"
42 #include "librpc/gen_ndr/ndr_misc.h"
43 #include "librpc/gen_ndr/ndr_drsuapi.h"
44 #include "librpc/gen_ndr/ndr_drsblobs.h"
45 #include "param/param.h"
46 #include "libcli/security/dom_sid.h"
47 #include "lib/util/dlinklist.h"
48
49 struct replmd_private {
50         TALLOC_CTX *la_ctx;
51         struct la_entry *la_list;
52         uint32_t num_ncs;
53         struct nc_entry {
54                 struct ldb_dn *dn;
55                 struct GUID guid;
56                 uint64_t mod_usn;
57         } *ncs;
58 };
59
60 struct la_entry {
61         struct la_entry *next, *prev;
62         struct drsuapi_DsReplicaLinkedAttribute *la;
63 };
64
65 struct replmd_replicated_request {
66         struct ldb_module *module;
67         struct ldb_request *req;
68
69         const struct dsdb_schema *schema;
70
71         struct dsdb_extended_replicated_objects *objs;
72
73         /* the controls we pass down */
74         struct ldb_control **controls;
75
76         uint32_t index_current;
77
78         struct ldb_message *search_msg;
79 };
80
81
82 /*
83   initialise the module
84   allocate the private structure and build the list
85   of partition DNs for use by replmd_notify()
86  */
87 static int replmd_init(struct ldb_module *module)
88 {
89         struct replmd_private *replmd_private;
90         struct ldb_context *ldb = ldb_module_get_ctx(module);
91
92         replmd_private = talloc_zero(module, struct replmd_private);
93         if (replmd_private == NULL) {
94                 ldb_oom(ldb);
95                 return LDB_ERR_OPERATIONS_ERROR;
96         }
97         ldb_module_set_private(module, replmd_private);
98
99         return ldb_next_init(module);
100 }
101
102
103 static int nc_compare(struct nc_entry *n1, struct nc_entry *n2)
104 {
105         return ldb_dn_compare(n1->dn, n2->dn);
106 }
107
108 /*
109   build the list of partition DNs for use by replmd_notify()
110  */
111 static int replmd_load_NCs(struct ldb_module *module)
112 {
113         const char *attrs[] = { "namingContexts", NULL };
114         struct ldb_result *res = NULL;
115         int i, ret;
116         TALLOC_CTX *tmp_ctx;
117         struct ldb_context *ldb;
118         struct ldb_message_element *el;
119         struct replmd_private *replmd_private = 
120                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
121
122         if (replmd_private->ncs != NULL) {
123                 return LDB_SUCCESS;
124         }
125
126         ldb = ldb_module_get_ctx(module);
127         tmp_ctx = talloc_new(module);
128
129         /* load the list of naming contexts */
130         ret = ldb_search(ldb, tmp_ctx, &res, ldb_dn_new(tmp_ctx, ldb, ""), 
131                          LDB_SCOPE_BASE, attrs, NULL);
132         if (ret != LDB_SUCCESS ||
133             res->count != 1) {
134                 DEBUG(0,(__location__ ": Failed to load rootDSE\n"));
135                 return LDB_ERR_OPERATIONS_ERROR;
136         }
137
138         el = ldb_msg_find_element(res->msgs[0], "namingContexts");
139         if (el == NULL) {
140                 DEBUG(0,(__location__ ": Failed to load namingContexts\n"));
141                 return LDB_ERR_OPERATIONS_ERROR;                
142         }
143
144         replmd_private->num_ncs = el->num_values;
145         replmd_private->ncs = talloc_array(replmd_private, struct nc_entry, 
146                                            replmd_private->num_ncs);
147         if (replmd_private->ncs == NULL) {
148                 ldb_oom(ldb);
149                 return LDB_ERR_OPERATIONS_ERROR;
150         }
151
152         for (i=0; i<replmd_private->num_ncs; i++) {
153                 replmd_private->ncs[i].dn = 
154                         ldb_dn_from_ldb_val(replmd_private->ncs, 
155                                             ldb, &el->values[i]);
156                 replmd_private->ncs[i].mod_usn = 0;
157         }
158
159         talloc_free(res);
160
161         /* now find the GUIDs of each of those DNs */
162         for (i=0; i<replmd_private->num_ncs; i++) {
163                 const char *attrs2[] = { "objectGUID", NULL };
164                 ret = ldb_search(ldb, tmp_ctx, &res, replmd_private->ncs[i].dn,
165                                  LDB_SCOPE_BASE, attrs2, NULL);
166                 if (ret != LDB_SUCCESS ||
167                     res->count != 1) {
168                         /* this happens when the schema is first being
169                            setup */
170                         talloc_free(replmd_private->ncs);
171                         replmd_private->ncs = NULL;
172                         replmd_private->num_ncs = 0;
173                         talloc_free(tmp_ctx);
174                         return LDB_SUCCESS;
175                 }
176                 replmd_private->ncs[i].guid = 
177                         samdb_result_guid(res->msgs[0], "objectGUID");
178                 talloc_free(res);
179         }       
180
181         /* sort the NCs into order, most to least specific */
182         qsort(replmd_private->ncs, replmd_private->num_ncs,
183               sizeof(replmd_private->ncs[0]), QSORT_CAST nc_compare);
184
185         
186         talloc_free(tmp_ctx);
187         
188         return LDB_SUCCESS;
189 }
190
191
192 /*
193  * notify the repl task that a object has changed. The notifies are
194  * gathered up in the replmd_private structure then written to the
195  * @REPLCHANGED object in each partition during the prepare_commit
196  */
197 static int replmd_notify(struct ldb_module *module, struct ldb_dn *dn, uint64_t uSN)
198 {
199         int ret, i;
200         struct replmd_private *replmd_private = 
201                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
202
203         ret = replmd_load_NCs(module);
204         if (ret != LDB_SUCCESS) {
205                 return ret;
206         }
207         if (replmd_private->num_ncs == 0) {
208                 return LDB_SUCCESS;
209         }
210
211         for (i=0; i<replmd_private->num_ncs; i++) {
212                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, dn) == 0) {
213                         break;
214                 }
215         }
216         if (i == replmd_private->num_ncs) {
217                 DEBUG(0,(__location__ ": DN not within known NCs '%s'\n", 
218                          ldb_dn_get_linearized(dn)));
219                 return LDB_ERR_OPERATIONS_ERROR;
220         }
221
222         if (uSN > replmd_private->ncs[i].mod_usn) {
223             replmd_private->ncs[i].mod_usn = uSN;
224         }
225
226         return LDB_SUCCESS;
227 }
228
229
230 /*
231  * update a @REPLCHANGED record in each partition if there have been
232  * any writes of replicated data in the partition
233  */
234 static int replmd_notify_store(struct ldb_module *module)
235 {
236         int i;
237         struct replmd_private *replmd_private = 
238                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
239         struct ldb_context *ldb = ldb_module_get_ctx(module);
240
241         for (i=0; i<replmd_private->num_ncs; i++) {
242                 int ret;
243
244                 if (replmd_private->ncs[i].mod_usn == 0) {
245                         /* this partition has not changed in this
246                            transaction */
247                         continue;
248                 }
249
250                 ret = dsdb_save_partition_usn(ldb, replmd_private->ncs[i].dn, 
251                                               replmd_private->ncs[i].mod_usn);
252                 if (ret != LDB_SUCCESS) {
253                         DEBUG(0,(__location__ ": Failed to save partition uSN for %s\n",
254                                  ldb_dn_get_linearized(replmd_private->ncs[i].dn)));
255                         return ret;
256                 }
257         }
258
259         return LDB_SUCCESS;
260 }
261
262
263 /*
264   created a replmd_replicated_request context
265  */
266 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
267                                                          struct ldb_request *req)
268 {
269         struct ldb_context *ldb;
270         struct replmd_replicated_request *ac;
271
272         ldb = ldb_module_get_ctx(module);
273
274         ac = talloc_zero(req, struct replmd_replicated_request);
275         if (ac == NULL) {
276                 ldb_oom(ldb);
277                 return NULL;
278         }
279
280         ac->module = module;
281         ac->req = req;
282         return ac;
283 }
284
285 /*
286   add a time element to a record
287 */
288 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
289 {
290         struct ldb_message_element *el;
291         char *s;
292
293         if (ldb_msg_find_element(msg, attr) != NULL) {
294                 return LDB_SUCCESS;
295         }
296
297         s = ldb_timestring(msg, t);
298         if (s == NULL) {
299                 return LDB_ERR_OPERATIONS_ERROR;
300         }
301
302         if (ldb_msg_add_string(msg, attr, s) != LDB_SUCCESS) {
303                 return LDB_ERR_OPERATIONS_ERROR;
304         }
305
306         el = ldb_msg_find_element(msg, attr);
307         /* always set as replace. This works because on add ops, the flag
308            is ignored */
309         el->flags = LDB_FLAG_MOD_REPLACE;
310
311         return LDB_SUCCESS;
312 }
313
314 /*
315   add a uint64_t element to a record
316 */
317 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
318 {
319         struct ldb_message_element *el;
320
321         if (ldb_msg_find_element(msg, attr) != NULL) {
322                 return LDB_SUCCESS;
323         }
324
325         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != LDB_SUCCESS) {
326                 return LDB_ERR_OPERATIONS_ERROR;
327         }
328
329         el = ldb_msg_find_element(msg, attr);
330         /* always set as replace. This works because on add ops, the flag
331            is ignored */
332         el->flags = LDB_FLAG_MOD_REPLACE;
333
334         return LDB_SUCCESS;
335 }
336
337 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
338                                                    const struct replPropertyMetaData1 *m2,
339                                                    const uint32_t *rdn_attid)
340 {
341         if (m1->attid == m2->attid) {
342                 return 0;
343         }
344
345         /*
346          * the rdn attribute should be at the end!
347          * so we need to return a value greater than zero
348          * which means m1 is greater than m2
349          */
350         if (m1->attid == *rdn_attid) {
351                 return 1;
352         }
353
354         /*
355          * the rdn attribute should be at the end!
356          * so we need to return a value less than zero
357          * which means m2 is greater than m1
358          */
359         if (m2->attid == *rdn_attid) {
360                 return -1;
361         }
362
363         return m1->attid - m2->attid;
364 }
365
366 static void replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
367                                                  const uint32_t *rdn_attid)
368 {
369         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
370                   discard_const_p(void, rdn_attid), (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
371 }
372
373 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
374                                                  const struct ldb_message_element *e2,
375                                                  const struct dsdb_schema *schema)
376 {
377         const struct dsdb_attribute *a1;
378         const struct dsdb_attribute *a2;
379
380         /* 
381          * TODO: make this faster by caching the dsdb_attribute pointer
382          *       on the ldb_messag_element
383          */
384
385         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
386         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
387
388         /*
389          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
390          *       in the schema
391          */
392         if (!a1 || !a2) {
393                 return strcasecmp(e1->name, e2->name);
394         }
395
396         return a1->attributeID_id - a2->attributeID_id;
397 }
398
399 static void replmd_ldb_message_sort(struct ldb_message *msg,
400                                     const struct dsdb_schema *schema)
401 {
402         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
403                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
404 }
405
406 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
407 {
408         struct ldb_context *ldb;
409         struct replmd_replicated_request *ac;
410
411         ac = talloc_get_type(req->context, struct replmd_replicated_request);
412         ldb = ldb_module_get_ctx(ac->module);
413
414         if (!ares) {
415                 return ldb_module_done(ac->req, NULL, NULL,
416                                         LDB_ERR_OPERATIONS_ERROR);
417         }
418         if (ares->error != LDB_SUCCESS) {
419                 return ldb_module_done(ac->req, ares->controls,
420                                         ares->response, ares->error);
421         }
422
423         if (ares->type != LDB_REPLY_DONE) {
424                 ldb_set_errstring(ldb,
425                                   "invalid ldb_reply_type in callback");
426                 talloc_free(ares);
427                 return ldb_module_done(ac->req, NULL, NULL,
428                                         LDB_ERR_OPERATIONS_ERROR);
429         }
430
431         return ldb_module_done(ac->req, ares->controls,
432                                 ares->response, LDB_SUCCESS);
433 }
434
435 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
436 {
437         struct ldb_context *ldb;
438         struct replmd_replicated_request *ac;
439         const struct dsdb_schema *schema;
440         enum ndr_err_code ndr_err;
441         struct ldb_request *down_req;
442         struct ldb_message *msg;
443         const struct dsdb_attribute *rdn_attr = NULL;
444         struct GUID guid;
445         struct ldb_val guid_value;
446         struct replPropertyMetaDataBlob nmd;
447         struct ldb_val nmd_value;
448         uint64_t seq_num;
449         const struct GUID *our_invocation_id;
450         time_t t = time(NULL);
451         NTTIME now;
452         char *time_str;
453         int ret;
454         uint32_t i, ni=0;
455
456         /* do not manipulate our control entries */
457         if (ldb_dn_is_special(req->op.add.message->dn)) {
458                 return ldb_next_request(module, req);
459         }
460
461         ldb = ldb_module_get_ctx(module);
462
463         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
464
465         schema = dsdb_get_schema(ldb);
466         if (!schema) {
467                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
468                               "replmd_add: no dsdb_schema loaded");
469                 return LDB_ERR_CONSTRAINT_VIOLATION;
470         }
471
472         ac = replmd_ctx_init(module, req);
473         if (!ac) {
474                 return LDB_ERR_OPERATIONS_ERROR;
475         }
476
477         ac->schema = schema;
478
479         if (ldb_msg_find_element(req->op.add.message, "objectGUID") != NULL) {
480                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
481                               "replmd_add: it's not allowed to add an object with objectGUID\n");
482                 return LDB_ERR_UNWILLING_TO_PERFORM;
483         }
484
485         /* Get a sequence number from the backend */
486         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
487         if (ret != LDB_SUCCESS) {
488                 return ret;
489         }
490
491         /* a new GUID */
492         guid = GUID_random();
493
494         /* get our invocationId */
495         our_invocation_id = samdb_ntds_invocation_id(ldb);
496         if (!our_invocation_id) {
497                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
498                               "replmd_add: unable to find invocationId\n");
499                 return LDB_ERR_OPERATIONS_ERROR;
500         }
501
502         /* we have to copy the message as the caller might have it as a const */
503         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
504         if (msg == NULL) {
505                 ldb_oom(ldb);
506                 return LDB_ERR_OPERATIONS_ERROR;
507         }
508
509         /* generated times */
510         unix_to_nt_time(&now, t);
511         time_str = ldb_timestring(msg, t);
512         if (!time_str) {
513                 return LDB_ERR_OPERATIONS_ERROR;
514         }
515
516         /* 
517          * remove autogenerated attributes
518          */
519         ldb_msg_remove_attr(msg, "whenCreated");
520         ldb_msg_remove_attr(msg, "whenChanged");
521         ldb_msg_remove_attr(msg, "uSNCreated");
522         ldb_msg_remove_attr(msg, "uSNChanged");
523         ldb_msg_remove_attr(msg, "replPropertyMetaData");
524
525         /*
526          * readd replicated attributes
527          */
528         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
529         if (ret != LDB_SUCCESS) {
530                 ldb_oom(ldb);
531                 return LDB_ERR_OPERATIONS_ERROR;
532         }
533
534         /* build the replication meta_data */
535         ZERO_STRUCT(nmd);
536         nmd.version             = 1;
537         nmd.ctr.ctr1.count      = msg->num_elements;
538         nmd.ctr.ctr1.array      = talloc_array(msg,
539                                                struct replPropertyMetaData1,
540                                                nmd.ctr.ctr1.count);
541         if (!nmd.ctr.ctr1.array) {
542                 ldb_oom(ldb);
543                 return LDB_ERR_OPERATIONS_ERROR;
544         }
545
546         for (i=0; i < msg->num_elements; i++) {
547                 struct ldb_message_element *e = &msg->elements[i];
548                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
549                 const struct dsdb_attribute *sa;
550
551                 if (e->name[0] == '@') continue;
552
553                 sa = dsdb_attribute_by_lDAPDisplayName(schema, e->name);
554                 if (!sa) {
555                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
556                                       "replmd_add: attribute '%s' not defined in schema\n",
557                                       e->name);
558                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
559                 }
560
561                 if ((sa->systemFlags & 0x00000001) || (sa->systemFlags & 0x00000004)) {
562                         /* if the attribute is not replicated (0x00000001)
563                          * or constructed (0x00000004) it has no metadata
564                          */
565                         continue;
566                 }
567
568                 m->attid                        = sa->attributeID_id;
569                 m->version                      = 1;
570                 m->originating_change_time      = now;
571                 m->originating_invocation_id    = *our_invocation_id;
572                 m->originating_usn              = seq_num;
573                 m->local_usn                    = seq_num;
574                 ni++;
575
576                 if (ldb_attr_cmp(e->name, ldb_dn_get_rdn_name(msg->dn))) {
577                         rdn_attr = sa;
578                 }
579         }
580
581         /* fix meta data count */
582         nmd.ctr.ctr1.count = ni;
583
584         /*
585          * sort meta data array, and move the rdn attribute entry to the end
586          */
587         replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_attr->attributeID_id);
588
589         /* generated NDR encoded values */
590         ndr_err = ndr_push_struct_blob(&guid_value, msg, 
591                                        NULL,
592                                        &guid,
593                                        (ndr_push_flags_fn_t)ndr_push_GUID);
594         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
595                 ldb_oom(ldb);
596                 return LDB_ERR_OPERATIONS_ERROR;
597         }
598         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
599                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
600                                        &nmd,
601                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
602         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
603                 ldb_oom(ldb);
604                 return LDB_ERR_OPERATIONS_ERROR;
605         }
606
607         /*
608          * add the autogenerated values
609          */
610         ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
611         if (ret != LDB_SUCCESS) {
612                 ldb_oom(ldb);
613                 return LDB_ERR_OPERATIONS_ERROR;
614         }
615         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
616         if (ret != LDB_SUCCESS) {
617                 ldb_oom(ldb);
618                 return LDB_ERR_OPERATIONS_ERROR;
619         }
620         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
621         if (ret != LDB_SUCCESS) {
622                 ldb_oom(ldb);
623                 return LDB_ERR_OPERATIONS_ERROR;
624         }
625         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
626         if (ret != LDB_SUCCESS) {
627                 ldb_oom(ldb);
628                 return LDB_ERR_OPERATIONS_ERROR;
629         }
630         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
631         if (ret != LDB_SUCCESS) {
632                 ldb_oom(ldb);
633                 return LDB_ERR_OPERATIONS_ERROR;
634         }
635
636         /*
637          * sort the attributes by attid before storing the object
638          */
639         replmd_ldb_message_sort(msg, schema);
640
641         ret = ldb_build_add_req(&down_req, ldb, ac,
642                                 msg,
643                                 req->controls,
644                                 ac, replmd_op_callback,
645                                 req);
646         if (ret != LDB_SUCCESS) {
647                 return ret;
648         }
649
650         ret = replmd_notify(module, msg->dn, seq_num);
651         if (ret != LDB_SUCCESS) {
652                 return ret;
653         }
654
655         /* go on with the call chain */
656         return ldb_next_request(module, down_req);
657 }
658
659
660 /*
661  * update the replPropertyMetaData for one element
662  */
663 static int replmd_update_rpmd_element(struct ldb_context *ldb, 
664                                       struct ldb_message *msg,
665                                       struct ldb_message_element *el,
666                                       struct replPropertyMetaDataBlob *omd,
667                                       struct dsdb_schema *schema,
668                                       uint64_t *seq_num,
669                                       const struct GUID *our_invocation_id,
670                                       NTTIME now)
671 {
672         int i;
673         const struct dsdb_attribute *a;
674         struct replPropertyMetaData1 *md1;
675
676         a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
677         if (a == NULL) {
678                 DEBUG(0,(__location__ ": Unable to find attribute %s in schema\n",
679                          el->name));
680                 return LDB_ERR_OPERATIONS_ERROR;
681         }
682
683         if ((a->systemFlags & 0x00000001) || (a->systemFlags & 0x00000004)) {
684                 /* if the attribute is not replicated (0x00000001)
685                  * or constructed (0x00000004) it has no metadata
686                  */
687                 return LDB_SUCCESS;
688         }
689
690         for (i=0; i<omd->ctr.ctr1.count; i++) {
691                 if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) break;
692         }
693         if (i == omd->ctr.ctr1.count) {
694                 /* we need to add a new one */
695                 omd->ctr.ctr1.array = talloc_realloc(msg, omd->ctr.ctr1.array, 
696                                                      struct replPropertyMetaData1, omd->ctr.ctr1.count+1);
697                 if (omd->ctr.ctr1.array == NULL) {
698                         ldb_oom(ldb);
699                         return LDB_ERR_OPERATIONS_ERROR;
700                 }
701                 omd->ctr.ctr1.count++;
702                 ZERO_STRUCT(omd->ctr.ctr1.array[i]);
703         }
704
705         /* Get a new sequence number from the backend. We only do this
706          * if we have a change that requires a new
707          * replPropertyMetaData element 
708          */
709         if (*seq_num == 0) {
710                 int ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num);
711                 if (ret != LDB_SUCCESS) {
712                         return LDB_ERR_OPERATIONS_ERROR;
713                 }
714         }
715
716         md1 = &omd->ctr.ctr1.array[i];
717         md1->version++;
718         md1->attid                     = a->attributeID_id;
719         md1->originating_change_time   = now;
720         md1->originating_invocation_id = *our_invocation_id;
721         md1->originating_usn           = *seq_num;
722         md1->local_usn                 = *seq_num;
723         
724         return LDB_SUCCESS;
725 }
726
727 /*
728  * update the replPropertyMetaData object each time we modify an
729  * object. This is needed for DRS replication, as the merge on the
730  * client is based on this object 
731  */
732 static int replmd_update_rpmd(struct ldb_module *module, 
733                               struct ldb_message *msg, uint64_t *seq_num)
734 {
735         const struct ldb_val *omd_value;
736         enum ndr_err_code ndr_err;
737         struct replPropertyMetaDataBlob omd;
738         int i;
739         struct dsdb_schema *schema;
740         time_t t = time(NULL);
741         NTTIME now;
742         const struct GUID *our_invocation_id;
743         int ret;
744         const char *attrs[] = { "replPropertyMetaData" , NULL };
745         struct ldb_result *res;
746         struct ldb_context *ldb;
747
748         ldb = ldb_module_get_ctx(module);
749
750         our_invocation_id = samdb_ntds_invocation_id(ldb);
751         if (!our_invocation_id) {
752                 /* this happens during an initial vampire while
753                    updating the schema */
754                 DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
755                 return LDB_SUCCESS;
756         }
757
758         unix_to_nt_time(&now, t);
759
760         /* search for the existing replPropertyMetaDataBlob */
761         ret = ldb_search(ldb, msg, &res, msg->dn, LDB_SCOPE_BASE, attrs, NULL);
762         if (ret != LDB_SUCCESS || res->count < 1) {
763                 DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
764                          ldb_dn_get_linearized(msg->dn)));
765                 return LDB_ERR_OPERATIONS_ERROR;
766         }
767                 
768
769         omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
770         if (!omd_value) {
771                 DEBUG(0,(__location__ ": Object %s does not have a replPropertyMetaData attribute\n",
772                          ldb_dn_get_linearized(msg->dn)));
773                 return LDB_ERR_OPERATIONS_ERROR;
774         }
775
776         ndr_err = ndr_pull_struct_blob(omd_value, msg,
777                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
778                                        (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
779         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
780                 DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
781                          ldb_dn_get_linearized(msg->dn)));
782                 return LDB_ERR_OPERATIONS_ERROR;
783         }
784
785         if (omd.version != 1) {
786                 DEBUG(0,(__location__ ": bad version %u in replPropertyMetaData for %s\n",
787                          omd.version, ldb_dn_get_linearized(msg->dn)));
788                 return LDB_ERR_OPERATIONS_ERROR;
789         }
790
791         schema = dsdb_get_schema(ldb);
792
793         for (i=0; i<msg->num_elements; i++) {
794                 ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], &omd, schema, seq_num, 
795                                                  our_invocation_id, now);
796                 if (ret != LDB_SUCCESS) {
797                         return ret;
798                 }
799         }
800
801         /*
802          * replmd_update_rpmd_element has done an update if the
803          * seq_num is set
804          */
805         if (*seq_num != 0) {
806                 struct ldb_val *md_value;
807                 struct ldb_message_element *el;
808
809                 md_value = talloc(msg, struct ldb_val);
810                 if (md_value == NULL) {
811                         ldb_oom(ldb);
812                         return LDB_ERR_OPERATIONS_ERROR;
813                 }
814
815                 ndr_err = ndr_push_struct_blob(md_value, msg, 
816                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
817                                                &omd,
818                                                (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
819                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
820                         DEBUG(0,(__location__ ": Failed to marshall replPropertyMetaData for %s\n",
821                                  ldb_dn_get_linearized(msg->dn)));
822                         return LDB_ERR_OPERATIONS_ERROR;
823                 }
824
825                 ret = ldb_msg_add_empty(msg, "replPropertyMetaData", LDB_FLAG_MOD_REPLACE, &el);
826                 if (ret != LDB_SUCCESS) {
827                         DEBUG(0,(__location__ ": Failed to add updated replPropertyMetaData %s\n",
828                                  ldb_dn_get_linearized(msg->dn)));
829                         return ret;
830                 }
831
832                 ret = replmd_notify(module, msg->dn, *seq_num);
833                 if (ret != LDB_SUCCESS) {
834                         return ret;
835                 }
836
837                 el->num_values = 1;
838                 el->values = md_value;
839         }
840
841         return LDB_SUCCESS;     
842 }
843
844
845 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
846 {
847         struct ldb_context *ldb;
848         struct replmd_replicated_request *ac;
849         const struct dsdb_schema *schema;
850         struct ldb_request *down_req;
851         struct ldb_message *msg;
852         int ret;
853         time_t t = time(NULL);
854         uint64_t seq_num = 0;
855
856         /* do not manipulate our control entries */
857         if (ldb_dn_is_special(req->op.mod.message->dn)) {
858                 return ldb_next_request(module, req);
859         }
860
861         ldb = ldb_module_get_ctx(module);
862
863         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
864
865         schema = dsdb_get_schema(ldb);
866         if (!schema) {
867                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
868                               "replmd_modify: no dsdb_schema loaded");
869                 return LDB_ERR_CONSTRAINT_VIOLATION;
870         }
871
872         ac = replmd_ctx_init(module, req);
873         if (!ac) {
874                 return LDB_ERR_OPERATIONS_ERROR;
875         }
876
877         ac->schema = schema;
878
879         /* we have to copy the message as the caller might have it as a const */
880         msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
881         if (msg == NULL) {
882                 talloc_free(ac);
883                 return LDB_ERR_OPERATIONS_ERROR;
884         }
885
886         /* TODO:
887          * - get the whole old object
888          * - if the old object doesn't exist report an error
889          * - give an error when a readonly attribute should
890          *   be modified
891          * - merge the changed into the old object
892          *   if the caller set values to the same value
893          *   ignore the attribute, return success when no
894          *   attribute was changed
895          */
896
897         ret = replmd_update_rpmd(module, msg, &seq_num);
898         if (ret != LDB_SUCCESS) {
899                 return ret;
900         }
901
902         /* TODO:
903          * - sort the attributes by attid with replmd_ldb_message_sort()
904          * - replace the old object with the newly constructed one
905          */
906
907         ret = ldb_build_mod_req(&down_req, ldb, ac,
908                                 msg,
909                                 req->controls,
910                                 ac, replmd_op_callback,
911                                 req);
912         if (ret != LDB_SUCCESS) {
913                 return ret;
914         }
915         talloc_steal(down_req, msg);
916
917         /* we only change whenChanged and uSNChanged if the seq_num
918            has changed */
919         if (seq_num != 0) {
920                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
921                         talloc_free(ac);
922                         return LDB_ERR_OPERATIONS_ERROR;
923                 }
924
925                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
926                         talloc_free(ac);
927                         return LDB_ERR_OPERATIONS_ERROR;
928                 }
929         }
930
931         /* go on with the call chain */
932         return ldb_next_request(module, down_req);
933 }
934
935
936 /*
937   handle a rename request
938
939   On a rename we need to do an extra ldb_modify which sets the
940   whenChanged and uSNChanged attributes
941  */
942 static int replmd_rename(struct ldb_module *module, struct ldb_request *req)
943 {
944         struct ldb_context *ldb;
945         int ret, i;
946         time_t t = time(NULL);
947         uint64_t seq_num = 0;
948         struct ldb_message *msg;
949         struct replmd_private *replmd_private = 
950                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
951
952         /* do not manipulate our control entries */
953         if (ldb_dn_is_special(req->op.mod.message->dn)) {
954                 return ldb_next_request(module, req);
955         }
956
957         ldb = ldb_module_get_ctx(module);
958
959         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_rename\n");
960
961         /* Get a sequence number from the backend */
962         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
963         if (ret != LDB_SUCCESS) {
964                 return ret;
965         }
966
967         msg = ldb_msg_new(req);
968         if (msg == NULL) {
969                 ldb_oom(ldb);
970                 return LDB_ERR_OPERATIONS_ERROR;
971         }
972
973         msg->dn = req->op.rename.olddn;
974
975         if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
976                 talloc_free(msg);
977                 return LDB_ERR_OPERATIONS_ERROR;
978         }
979         msg->elements[0].flags = LDB_FLAG_MOD_REPLACE;
980
981         if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
982                 talloc_free(msg);
983                 return LDB_ERR_OPERATIONS_ERROR;
984         }
985         msg->elements[1].flags = LDB_FLAG_MOD_REPLACE;
986
987         ret = ldb_modify(ldb, msg);
988         talloc_free(msg);
989         if (ret != LDB_SUCCESS) {
990                 return ret;
991         }
992
993         ret = replmd_load_NCs(module);
994         if (ret != 0) {
995                 return ret;
996         }
997
998         /* now update the highest uSNs of the partitions that are
999            affected. Note that two partitions could be changing */
1000         for (i=0; i<replmd_private->num_ncs; i++) {
1001                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, 
1002                                         req->op.rename.olddn) == 0) {
1003                         break;
1004                 }
1005         }
1006         if (i == replmd_private->num_ncs) {
1007                 DEBUG(0,(__location__ ": rename olddn outside tree? %s\n",
1008                          ldb_dn_get_linearized(req->op.rename.olddn)));
1009                 return LDB_ERR_OPERATIONS_ERROR;
1010         }
1011         replmd_private->ncs[i].mod_usn = seq_num;
1012
1013         for (i=0; i<replmd_private->num_ncs; i++) {
1014                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, 
1015                                         req->op.rename.newdn) == 0) {
1016                         break;
1017                 }
1018         }
1019         if (i == replmd_private->num_ncs) {
1020                 DEBUG(0,(__location__ ": rename newdn outside tree? %s\n",
1021                          ldb_dn_get_linearized(req->op.rename.newdn)));
1022                 return LDB_ERR_OPERATIONS_ERROR;
1023         }
1024         replmd_private->ncs[i].mod_usn = seq_num;
1025         
1026         /* go on with the call chain */
1027         return ldb_next_request(module, req);
1028 }
1029
1030
1031 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
1032 {
1033         return ret;
1034 }
1035
1036 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
1037 {
1038         int ret = LDB_ERR_OTHER;
1039         /* TODO: do some error mapping */
1040         return ret;
1041 }
1042
1043 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
1044
1045 static int replmd_replicated_apply_add_callback(struct ldb_request *req,
1046                                                 struct ldb_reply *ares)
1047 {
1048         struct ldb_context *ldb;
1049         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1050                                                struct replmd_replicated_request);
1051         int ret;
1052
1053         ldb = ldb_module_get_ctx(ar->module);
1054
1055         if (!ares) {
1056                 return ldb_module_done(ar->req, NULL, NULL,
1057                                         LDB_ERR_OPERATIONS_ERROR);
1058         }
1059         if (ares->error != LDB_SUCCESS) {
1060                 return ldb_module_done(ar->req, ares->controls,
1061                                         ares->response, ares->error);
1062         }
1063
1064         if (ares->type != LDB_REPLY_DONE) {
1065                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1066                 return ldb_module_done(ar->req, NULL, NULL,
1067                                         LDB_ERR_OPERATIONS_ERROR);
1068         }
1069
1070         talloc_free(ares);
1071         ar->index_current++;
1072
1073         ret = replmd_replicated_apply_next(ar);
1074         if (ret != LDB_SUCCESS) {
1075                 return ldb_module_done(ar->req, NULL, NULL, ret);
1076         }
1077
1078         return LDB_SUCCESS;
1079 }
1080
1081 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
1082 {
1083         struct ldb_context *ldb;
1084         struct ldb_request *change_req;
1085         enum ndr_err_code ndr_err;
1086         struct ldb_message *msg;
1087         struct replPropertyMetaDataBlob *md;
1088         struct ldb_val md_value;
1089         uint32_t i;
1090         uint64_t seq_num;
1091         int ret;
1092
1093         /*
1094          * TODO: check if the parent object exist
1095          */
1096
1097         /*
1098          * TODO: handle the conflict case where an object with the
1099          *       same name exist
1100          */
1101
1102         ldb = ldb_module_get_ctx(ar->module);
1103         msg = ar->objs->objects[ar->index_current].msg;
1104         md = ar->objs->objects[ar->index_current].meta_data;
1105
1106         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1107         if (ret != LDB_SUCCESS) {
1108                 return replmd_replicated_request_error(ar, ret);
1109         }
1110
1111         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
1112         if (ret != LDB_SUCCESS) {
1113                 return replmd_replicated_request_error(ar, ret);
1114         }
1115
1116         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1117         if (ret != LDB_SUCCESS) {
1118                 return replmd_replicated_request_error(ar, ret);
1119         }
1120
1121         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
1122         if (ret != LDB_SUCCESS) {
1123                 return replmd_replicated_request_error(ar, ret);
1124         }
1125
1126         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1127         if (ret != LDB_SUCCESS) {
1128                 return replmd_replicated_request_error(ar, ret);
1129         }
1130
1131         ret = replmd_notify(ar->module, msg->dn, seq_num);
1132         if (ret != LDB_SUCCESS) {
1133                 return replmd_replicated_request_error(ar, ret);
1134         }
1135
1136         /*
1137          * the meta data array is already sorted by the caller
1138          */
1139         for (i=0; i < md->ctr.ctr1.count; i++) {
1140                 md->ctr.ctr1.array[i].local_usn = seq_num;
1141         }
1142         ndr_err = ndr_push_struct_blob(&md_value, msg, 
1143                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1144                                        md,
1145                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1146         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1147                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1148                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1149         }
1150         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
1151         if (ret != LDB_SUCCESS) {
1152                 return replmd_replicated_request_error(ar, ret);
1153         }
1154
1155         replmd_ldb_message_sort(msg, ar->schema);
1156
1157         if (DEBUGLVL(4)) {
1158                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
1159                 DEBUG(4, ("DRS replication add message:\n%s\n", s));
1160                 talloc_free(s);
1161         }
1162
1163         ret = ldb_build_add_req(&change_req,
1164                                 ldb,
1165                                 ar,
1166                                 msg,
1167                                 ar->controls,
1168                                 ar,
1169                                 replmd_replicated_apply_add_callback,
1170                                 ar->req);
1171         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1172
1173         return ldb_next_request(ar->module, change_req);
1174 }
1175
1176 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
1177                                                          struct replPropertyMetaData1 *m2)
1178 {
1179         int ret;
1180
1181         if (m1->version != m2->version) {
1182                 return m1->version - m2->version;
1183         }
1184
1185         if (m1->originating_change_time != m2->originating_change_time) {
1186                 return m1->originating_change_time - m2->originating_change_time;
1187         }
1188
1189         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
1190         if (ret != 0) {
1191                 return ret;
1192         }
1193
1194         return m1->originating_usn - m2->originating_usn;
1195 }
1196
1197 static int replmd_replicated_apply_merge_callback(struct ldb_request *req,
1198                                                   struct ldb_reply *ares)
1199 {
1200         struct ldb_context *ldb;
1201         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1202                                                struct replmd_replicated_request);
1203         int ret;
1204
1205         ldb = ldb_module_get_ctx(ar->module);
1206
1207         if (!ares) {
1208                 return ldb_module_done(ar->req, NULL, NULL,
1209                                         LDB_ERR_OPERATIONS_ERROR);
1210         }
1211         if (ares->error != LDB_SUCCESS) {
1212                 return ldb_module_done(ar->req, ares->controls,
1213                                         ares->response, ares->error);
1214         }
1215
1216         if (ares->type != LDB_REPLY_DONE) {
1217                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1218                 return ldb_module_done(ar->req, NULL, NULL,
1219                                         LDB_ERR_OPERATIONS_ERROR);
1220         }
1221
1222         talloc_free(ares);
1223         ar->index_current++;
1224
1225         ret = replmd_replicated_apply_next(ar);
1226         if (ret != LDB_SUCCESS) {
1227                 return ldb_module_done(ar->req, NULL, NULL, ret);
1228         }
1229
1230         return LDB_SUCCESS;
1231 }
1232
1233 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
1234 {
1235         struct ldb_context *ldb;
1236         struct ldb_request *change_req;
1237         enum ndr_err_code ndr_err;
1238         struct ldb_message *msg;
1239         struct replPropertyMetaDataBlob *rmd;
1240         struct replPropertyMetaDataBlob omd;
1241         const struct ldb_val *omd_value;
1242         struct replPropertyMetaDataBlob nmd;
1243         struct ldb_val nmd_value;
1244         uint32_t i,j,ni=0;
1245         uint32_t removed_attrs = 0;
1246         uint64_t seq_num;
1247         int ret;
1248
1249         ldb = ldb_module_get_ctx(ar->module);
1250         msg = ar->objs->objects[ar->index_current].msg;
1251         rmd = ar->objs->objects[ar->index_current].meta_data;
1252         ZERO_STRUCT(omd);
1253         omd.version = 1;
1254
1255         /*
1256          * TODO: check repl data is correct after a rename
1257          */
1258         if (ldb_dn_compare(msg->dn, ar->search_msg->dn) != 0) {
1259                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_request rename %s => %s\n",
1260                           ldb_dn_get_linearized(ar->search_msg->dn),
1261                           ldb_dn_get_linearized(msg->dn));
1262                 if (ldb_rename(ldb, ar->search_msg->dn, msg->dn) != LDB_SUCCESS) {
1263                         ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_replicated_request rename %s => %s failed - %s\n",
1264                                   ldb_dn_get_linearized(ar->search_msg->dn),
1265                                   ldb_dn_get_linearized(msg->dn),
1266                                   ldb_errstring(ldb));
1267                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
1268                 }
1269         }
1270
1271         /* find existing meta data */
1272         omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
1273         if (omd_value) {
1274                 ndr_err = ndr_pull_struct_blob(omd_value, ar,
1275                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
1276                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
1277                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1278                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1279                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1280                 }
1281
1282                 if (omd.version != 1) {
1283                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1284                 }
1285         }
1286
1287         ZERO_STRUCT(nmd);
1288         nmd.version = 1;
1289         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
1290         nmd.ctr.ctr1.array = talloc_array(ar,
1291                                           struct replPropertyMetaData1,
1292                                           nmd.ctr.ctr1.count);
1293         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1294
1295         /* first copy the old meta data */
1296         for (i=0; i < omd.ctr.ctr1.count; i++) {
1297                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
1298                 ni++;
1299         }
1300
1301         /* now merge in the new meta data */
1302         for (i=0; i < rmd->ctr.ctr1.count; i++) {
1303                 bool found = false;
1304
1305                 for (j=0; j < ni; j++) {
1306                         int cmp;
1307
1308                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
1309                                 continue;
1310                         }
1311
1312                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
1313                                                                             &nmd.ctr.ctr1.array[j]);
1314                         if (cmp > 0) {
1315                                 /* replace the entry */
1316                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
1317                                 found = true;
1318                                 break;
1319                         }
1320
1321                         /* we don't want to apply this change so remove the attribute */
1322                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
1323                         removed_attrs++;
1324
1325                         found = true;
1326                         break;
1327                 }
1328
1329                 if (found) continue;
1330
1331                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
1332                 ni++;
1333         }
1334
1335         /*
1336          * finally correct the size of the meta_data array
1337          */
1338         nmd.ctr.ctr1.count = ni;
1339
1340         /*
1341          * the rdn attribute (the alias for the name attribute),
1342          * 'cn' for most objects is the last entry in the meta data array
1343          * we have stored
1344          *
1345          * sort the new meta data array
1346          */
1347         {
1348                 struct replPropertyMetaData1 *rdn_p;
1349                 uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
1350
1351                 rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
1352                 replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_p->attid);
1353         }
1354
1355         /*
1356          * check if some replicated attributes left, otherwise skip the ldb_modify() call
1357          */
1358         if (msg->num_elements == 0) {
1359                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
1360                           ar->index_current);
1361
1362                 ar->index_current++;
1363                 return replmd_replicated_apply_next(ar);
1364         }
1365
1366         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
1367                   ar->index_current, msg->num_elements);
1368
1369         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1370         if (ret != LDB_SUCCESS) {
1371                 return replmd_replicated_request_error(ar, ret);
1372         }
1373
1374         for (i=0; i<ni; i++) {
1375                 nmd.ctr.ctr1.array[i].local_usn = seq_num;
1376         }
1377
1378         /* create the meta data value */
1379         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
1380                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1381                                        &nmd,
1382                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1383         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1384                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1385                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1386         }
1387
1388         /*
1389          * when we know that we'll modify the record, add the whenChanged, uSNChanged
1390          * and replPopertyMetaData attributes
1391          */
1392         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1393         if (ret != LDB_SUCCESS) {
1394                 return replmd_replicated_request_error(ar, ret);
1395         }
1396         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1397         if (ret != LDB_SUCCESS) {
1398                 return replmd_replicated_request_error(ar, ret);
1399         }
1400         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
1401         if (ret != LDB_SUCCESS) {
1402                 return replmd_replicated_request_error(ar, ret);
1403         }
1404
1405         replmd_ldb_message_sort(msg, ar->schema);
1406
1407         /* we want to replace the old values */
1408         for (i=0; i < msg->num_elements; i++) {
1409                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
1410         }
1411
1412         ret = replmd_notify(ar->module, msg->dn, seq_num);
1413         if (ret != LDB_SUCCESS) {
1414                 return replmd_replicated_request_error(ar, ret);
1415         }
1416
1417         if (DEBUGLVL(4)) {
1418                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1419                 DEBUG(4, ("DRS replication modify message:\n%s\n", s));
1420                 talloc_free(s);
1421         }
1422
1423         ret = ldb_build_mod_req(&change_req,
1424                                 ldb,
1425                                 ar,
1426                                 msg,
1427                                 ar->controls,
1428                                 ar,
1429                                 replmd_replicated_apply_merge_callback,
1430                                 ar->req);
1431         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1432
1433         return ldb_next_request(ar->module, change_req);
1434 }
1435
1436 static int replmd_replicated_apply_search_callback(struct ldb_request *req,
1437                                                    struct ldb_reply *ares)
1438 {
1439         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1440                                                struct replmd_replicated_request);
1441         int ret;
1442
1443         if (!ares) {
1444                 return ldb_module_done(ar->req, NULL, NULL,
1445                                         LDB_ERR_OPERATIONS_ERROR);
1446         }
1447         if (ares->error != LDB_SUCCESS &&
1448             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1449                 return ldb_module_done(ar->req, ares->controls,
1450                                         ares->response, ares->error);
1451         }
1452
1453         switch (ares->type) {
1454         case LDB_REPLY_ENTRY:
1455                 ar->search_msg = talloc_steal(ar, ares->message);
1456                 break;
1457
1458         case LDB_REPLY_REFERRAL:
1459                 /* we ignore referrals */
1460                 break;
1461
1462         case LDB_REPLY_DONE:
1463                 if (ar->search_msg != NULL) {
1464                         ret = replmd_replicated_apply_merge(ar);
1465                 } else {
1466                         ret = replmd_replicated_apply_add(ar);
1467                 }
1468                 if (ret != LDB_SUCCESS) {
1469                         return ldb_module_done(ar->req, NULL, NULL, ret);
1470                 }
1471         }
1472
1473         talloc_free(ares);
1474         return LDB_SUCCESS;
1475 }
1476
1477 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
1478
1479 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
1480 {
1481         struct ldb_context *ldb;
1482         int ret;
1483         char *tmp_str;
1484         char *filter;
1485         struct ldb_request *search_req;
1486
1487         if (ar->index_current >= ar->objs->num_objects) {
1488                 /* done with it, go to next stage */
1489                 return replmd_replicated_uptodate_vector(ar);
1490         }
1491
1492         ldb = ldb_module_get_ctx(ar->module);
1493         ar->search_msg = NULL;
1494
1495         tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].guid_value);
1496         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1497
1498         filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
1499         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1500         talloc_free(tmp_str);
1501
1502         ret = ldb_build_search_req(&search_req,
1503                                    ldb,
1504                                    ar,
1505                                    ar->objs->partition_dn,
1506                                    LDB_SCOPE_SUBTREE,
1507                                    filter,
1508                                    NULL,
1509                                    NULL,
1510                                    ar,
1511                                    replmd_replicated_apply_search_callback,
1512                                    ar->req);
1513         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1514
1515         return ldb_next_request(ar->module, search_req);
1516 }
1517
1518 static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
1519                                                       struct ldb_reply *ares)
1520 {
1521         struct ldb_context *ldb;
1522         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1523                                                struct replmd_replicated_request);
1524         ldb = ldb_module_get_ctx(ar->module);
1525
1526         if (!ares) {
1527                 return ldb_module_done(ar->req, NULL, NULL,
1528                                         LDB_ERR_OPERATIONS_ERROR);
1529         }
1530         if (ares->error != LDB_SUCCESS) {
1531                 return ldb_module_done(ar->req, ares->controls,
1532                                         ares->response, ares->error);
1533         }
1534
1535         if (ares->type != LDB_REPLY_DONE) {
1536                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1537                 return ldb_module_done(ar->req, NULL, NULL,
1538                                         LDB_ERR_OPERATIONS_ERROR);
1539         }
1540
1541         talloc_free(ares);
1542
1543         return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
1544 }
1545
1546 static int replmd_drsuapi_DsReplicaCursor2_compare(const struct drsuapi_DsReplicaCursor2 *c1,
1547                                                    const struct drsuapi_DsReplicaCursor2 *c2)
1548 {
1549         return GUID_compare(&c1->source_dsa_invocation_id, &c2->source_dsa_invocation_id);
1550 }
1551
1552 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1553 {
1554         struct ldb_context *ldb;
1555         struct ldb_request *change_req;
1556         enum ndr_err_code ndr_err;
1557         struct ldb_message *msg;
1558         struct replUpToDateVectorBlob ouv;
1559         const struct ldb_val *ouv_value;
1560         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1561         struct replUpToDateVectorBlob nuv;
1562         struct ldb_val nuv_value;
1563         struct ldb_message_element *nuv_el = NULL;
1564         const struct GUID *our_invocation_id;
1565         struct ldb_message_element *orf_el = NULL;
1566         struct repsFromToBlob nrf;
1567         struct ldb_val *nrf_value = NULL;
1568         struct ldb_message_element *nrf_el = NULL;
1569         uint32_t i,j,ni=0;
1570         bool found = false;
1571         time_t t = time(NULL);
1572         NTTIME now;
1573         int ret;
1574
1575         ldb = ldb_module_get_ctx(ar->module);
1576         ruv = ar->objs->uptodateness_vector;
1577         ZERO_STRUCT(ouv);
1578         ouv.version = 2;
1579         ZERO_STRUCT(nuv);
1580         nuv.version = 2;
1581
1582         unix_to_nt_time(&now, t);
1583
1584         /*
1585          * first create the new replUpToDateVector
1586          */
1587         ouv_value = ldb_msg_find_ldb_val(ar->search_msg, "replUpToDateVector");
1588         if (ouv_value) {
1589                 ndr_err = ndr_pull_struct_blob(ouv_value, ar,
1590                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &ouv,
1591                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1592                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1593                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1594                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1595                 }
1596
1597                 if (ouv.version != 2) {
1598                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1599                 }
1600         }
1601
1602         /*
1603          * the new uptodateness vector will at least
1604          * contain 1 entry, one for the source_dsa
1605          *
1606          * plus optional values from our old vector and the one from the source_dsa
1607          */
1608         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
1609         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1610         nuv.ctr.ctr2.cursors = talloc_array(ar,
1611                                             struct drsuapi_DsReplicaCursor2,
1612                                             nuv.ctr.ctr2.count);
1613         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1614
1615         /* first copy the old vector */
1616         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1617                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1618                 ni++;
1619         }
1620
1621         /* get our invocation_id if we have one already attached to the ldb */
1622         our_invocation_id = samdb_ntds_invocation_id(ldb);
1623
1624         /* merge in the source_dsa vector is available */
1625         for (i=0; (ruv && i < ruv->count); i++) {
1626                 found = false;
1627
1628                 if (our_invocation_id &&
1629                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1630                                our_invocation_id)) {
1631                         continue;
1632                 }
1633
1634                 for (j=0; j < ni; j++) {
1635                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1636                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1637                                 continue;
1638                         }
1639
1640                         found = true;
1641
1642                         /*
1643                          * we update only the highest_usn and not the latest_sync_success time,
1644                          * because the last success stands for direct replication
1645                          */
1646                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1647                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1648                         }
1649                         break;                  
1650                 }
1651
1652                 if (found) continue;
1653
1654                 /* if it's not there yet, add it */
1655                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1656                 ni++;
1657         }
1658
1659         /*
1660          * merge in the current highwatermark for the source_dsa
1661          */
1662         found = false;
1663         for (j=0; j < ni; j++) {
1664                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1665                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1666                         continue;
1667                 }
1668
1669                 found = true;
1670
1671                 /*
1672                  * here we update the highest_usn and last_sync_success time
1673                  * because we're directly replicating from the source_dsa
1674                  *
1675                  * and use the tmp_highest_usn because this is what we have just applied
1676                  * to our ldb
1677                  */
1678                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1679                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1680                 break;
1681         }
1682         if (!found) {
1683                 /*
1684                  * here we update the highest_usn and last_sync_success time
1685                  * because we're directly replicating from the source_dsa
1686                  *
1687                  * and use the tmp_highest_usn because this is what we have just applied
1688                  * to our ldb
1689                  */
1690                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1691                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1692                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1693                 ni++;
1694         }
1695
1696         /*
1697          * finally correct the size of the cursors array
1698          */
1699         nuv.ctr.ctr2.count = ni;
1700
1701         /*
1702          * sort the cursors
1703          */
1704         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1705               sizeof(struct drsuapi_DsReplicaCursor2),
1706               (comparison_fn_t)replmd_drsuapi_DsReplicaCursor2_compare);
1707
1708         /*
1709          * create the change ldb_message
1710          */
1711         msg = ldb_msg_new(ar);
1712         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1713         msg->dn = ar->search_msg->dn;
1714
1715         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
1716                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1717                                        &nuv,
1718                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1719         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1720                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1721                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1722         }
1723         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1724         if (ret != LDB_SUCCESS) {
1725                 return replmd_replicated_request_error(ar, ret);
1726         }
1727         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1728
1729         /*
1730          * now create the new repsFrom value from the given repsFromTo1 structure
1731          */
1732         ZERO_STRUCT(nrf);
1733         nrf.version                                     = 1;
1734         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1735         /* and fix some values... */
1736         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1737         nrf.ctr.ctr1.last_success                       = now;
1738         nrf.ctr.ctr1.last_attempt                       = now;
1739         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1740         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1741
1742         /*
1743          * first see if we already have a repsFrom value for the current source dsa
1744          * if so we'll later replace this value
1745          */
1746         orf_el = ldb_msg_find_element(ar->search_msg, "repsFrom");
1747         if (orf_el) {
1748                 for (i=0; i < orf_el->num_values; i++) {
1749                         struct repsFromToBlob *trf;
1750
1751                         trf = talloc(ar, struct repsFromToBlob);
1752                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1753
1754                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), trf,
1755                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1756                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1757                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1758                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1759                         }
1760
1761                         if (trf->version != 1) {
1762                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1763                         }
1764
1765                         /*
1766                          * we compare the source dsa objectGUID not the invocation_id
1767                          * because we want only one repsFrom value per source dsa
1768                          * and when the invocation_id of the source dsa has changed we don't need 
1769                          * the old repsFrom with the old invocation_id
1770                          */
1771                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1772                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1773                                 talloc_free(trf);
1774                                 continue;
1775                         }
1776
1777                         talloc_free(trf);
1778                         nrf_value = &orf_el->values[i];
1779                         break;
1780                 }
1781
1782                 /*
1783                  * copy over all old values to the new ldb_message
1784                  */
1785                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1786                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1787                 *nrf_el = *orf_el;
1788         }
1789
1790         /*
1791          * if we haven't found an old repsFrom value for the current source dsa
1792          * we'll add a new value
1793          */
1794         if (!nrf_value) {
1795                 struct ldb_val zero_value;
1796                 ZERO_STRUCT(zero_value);
1797                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1798                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1799
1800                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1801         }
1802
1803         /* we now fill the value which is already attached to ldb_message */
1804         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
1805                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1806                                        &nrf,
1807                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1808         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1809                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1810                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1811         }
1812
1813         /* 
1814          * the ldb_message_element for the attribute, has all the old values and the new one
1815          * so we'll replace the whole attribute with all values
1816          */
1817         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1818
1819         if (DEBUGLVL(4)) {
1820                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1821                 DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
1822                 talloc_free(s);
1823         }
1824
1825         /* prepare the ldb_modify() request */
1826         ret = ldb_build_mod_req(&change_req,
1827                                 ldb,
1828                                 ar,
1829                                 msg,
1830                                 ar->controls,
1831                                 ar,
1832                                 replmd_replicated_uptodate_modify_callback,
1833                                 ar->req);
1834         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1835
1836         return ldb_next_request(ar->module, change_req);
1837 }
1838
1839 static int replmd_replicated_uptodate_search_callback(struct ldb_request *req,
1840                                                       struct ldb_reply *ares)
1841 {
1842         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1843                                                struct replmd_replicated_request);
1844         int ret;
1845
1846         if (!ares) {
1847                 return ldb_module_done(ar->req, NULL, NULL,
1848                                         LDB_ERR_OPERATIONS_ERROR);
1849         }
1850         if (ares->error != LDB_SUCCESS &&
1851             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1852                 return ldb_module_done(ar->req, ares->controls,
1853                                         ares->response, ares->error);
1854         }
1855
1856         switch (ares->type) {
1857         case LDB_REPLY_ENTRY:
1858                 ar->search_msg = talloc_steal(ar, ares->message);
1859                 break;
1860
1861         case LDB_REPLY_REFERRAL:
1862                 /* we ignore referrals */
1863                 break;
1864
1865         case LDB_REPLY_DONE:
1866                 if (ar->search_msg == NULL) {
1867                         ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1868                 } else {
1869                         ret = replmd_replicated_uptodate_modify(ar);
1870                 }
1871                 if (ret != LDB_SUCCESS) {
1872                         return ldb_module_done(ar->req, NULL, NULL, ret);
1873                 }
1874         }
1875
1876         talloc_free(ares);
1877         return LDB_SUCCESS;
1878 }
1879
1880
1881 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1882 {
1883         struct ldb_context *ldb;
1884         int ret;
1885         static const char *attrs[] = {
1886                 "replUpToDateVector",
1887                 "repsFrom",
1888                 NULL
1889         };
1890         struct ldb_request *search_req;
1891
1892         ldb = ldb_module_get_ctx(ar->module);
1893         ar->search_msg = NULL;
1894
1895         ret = ldb_build_search_req(&search_req,
1896                                    ldb,
1897                                    ar,
1898                                    ar->objs->partition_dn,
1899                                    LDB_SCOPE_BASE,
1900                                    "(objectClass=*)",
1901                                    attrs,
1902                                    NULL,
1903                                    ar,
1904                                    replmd_replicated_uptodate_search_callback,
1905                                    ar->req);
1906         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1907
1908         return ldb_next_request(ar->module, search_req);
1909 }
1910
1911
1912
1913 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1914 {
1915         struct ldb_context *ldb;
1916         struct dsdb_extended_replicated_objects *objs;
1917         struct replmd_replicated_request *ar;
1918         struct ldb_control **ctrls;
1919         int ret, i;
1920         struct dsdb_control_current_partition *partition_ctrl;
1921         struct replmd_private *replmd_private = 
1922                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1923
1924         ldb = ldb_module_get_ctx(module);
1925
1926         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1927
1928         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1929         if (!objs) {
1930                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1931                 return LDB_ERR_PROTOCOL_ERROR;
1932         }
1933
1934         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1935                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1936                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1937                 return LDB_ERR_PROTOCOL_ERROR;
1938         }
1939
1940         ar = replmd_ctx_init(module, req);
1941         if (!ar)
1942                 return LDB_ERR_OPERATIONS_ERROR;
1943
1944         ar->objs = objs;
1945         ar->schema = dsdb_get_schema(ldb);
1946         if (!ar->schema) {
1947                 ldb_debug_set(ldb, LDB_DEBUG_FATAL, "replmd_ctx_init: no loaded schema found\n");
1948                 talloc_free(ar);
1949                 return LDB_ERR_CONSTRAINT_VIOLATION;
1950         }
1951
1952         ctrls = req->controls;
1953
1954         if (req->controls) {
1955                 req->controls = talloc_memdup(ar, req->controls,
1956                                               talloc_get_size(req->controls));
1957                 if (!req->controls) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1958         }
1959
1960         ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, NULL);
1961         if (ret != LDB_SUCCESS) {
1962                 return ret;
1963         }
1964
1965         /*
1966           add the DSDB_CONTROL_CURRENT_PARTITION_OID control. This
1967           tells the partition module which partition this request is
1968           directed at. That is important as the partition roots appear
1969           twice in the directory, once as mount points in the top
1970           level store, and once as the roots of each partition. The
1971           replication code wants to operate on the root of the
1972           partitions, not the top level mount points
1973          */
1974         partition_ctrl = talloc(req, struct dsdb_control_current_partition);
1975         if (partition_ctrl == NULL) {
1976                 if (!partition_ctrl) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1977         }
1978         partition_ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
1979         partition_ctrl->dn = objs->partition_dn;
1980
1981         ret = ldb_request_add_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID, false, partition_ctrl);
1982         if (ret != LDB_SUCCESS) {
1983                 return ret;
1984         }
1985
1986         ar->controls = req->controls;
1987         req->controls = ctrls;
1988
1989         DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
1990
1991         /* save away the linked attributes for the end of the
1992            transaction */
1993         for (i=0; i<ar->objs->linked_attributes_count; i++) {
1994                 struct la_entry *la_entry;
1995
1996                 if (replmd_private->la_ctx == NULL) {
1997                         replmd_private->la_ctx = talloc_new(replmd_private);
1998                 }
1999                 la_entry = talloc(replmd_private->la_ctx, struct la_entry);
2000                 if (la_entry == NULL) {
2001                         ldb_oom(ldb);
2002                         return LDB_ERR_OPERATIONS_ERROR;
2003                 }
2004                 la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
2005                 if (la_entry->la == NULL) {
2006                         talloc_free(la_entry);
2007                         ldb_oom(ldb);
2008                         return LDB_ERR_OPERATIONS_ERROR;
2009                 }
2010                 *la_entry->la = ar->objs->linked_attributes[i];
2011
2012                 /* we need to steal the non-scalars so they stay
2013                    around until the end of the transaction */
2014                 talloc_steal(la_entry->la, la_entry->la->identifier);
2015                 talloc_steal(la_entry->la, la_entry->la->value.blob);
2016
2017                 DLIST_ADD(replmd_private->la_list, la_entry);
2018         }
2019
2020         return replmd_replicated_apply_next(ar);
2021 }
2022
2023 /*
2024   process one linked attribute structure
2025  */
2026 static int replmd_process_linked_attribute(struct ldb_module *module,
2027                                            struct la_entry *la_entry)
2028 {                                          
2029         struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
2030         struct ldb_context *ldb = ldb_module_get_ctx(module);
2031         struct drsuapi_DsReplicaObjectIdentifier3 target;
2032         struct ldb_message *msg;
2033         struct ldb_message_element *ret_el;
2034         TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
2035         enum ndr_err_code ndr_err;
2036         char *target_dn;
2037         struct ldb_request *mod_req;
2038         int ret;
2039         const struct dsdb_attribute *attr;
2040
2041 /*
2042 linked_attributes[0]:                                                     
2043      &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute 
2044         identifier               : *                                      
2045             identifier: struct drsuapi_DsReplicaObjectIdentifier          
2046                 __ndr_size               : 0x0000003a (58)                
2047                 __ndr_size_sid           : 0x00000000 (0)                 
2048                 guid                     : 8e95b6a9-13dd-4158-89db-3220a5be5cc7
2049                 sid                      : S-0-0                               
2050                 __ndr_size_dn            : 0x00000000 (0)                      
2051                 dn                       : ''                                  
2052         attid                    : DRSUAPI_ATTRIBUTE_member (0x1F)             
2053         value: struct drsuapi_DsAttributeValue                                 
2054             __ndr_size               : 0x0000007e (126)                        
2055             blob                     : *                                       
2056                 blob                     : DATA_BLOB length=126                
2057         flags                    : 0x00000001 (1)                              
2058                1: DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE                      
2059         originating_add_time     : Wed Sep  2 22:20:01 2009 EST                
2060         meta_data: struct drsuapi_DsReplicaMetaData                            
2061             version                  : 0x00000015 (21)                         
2062             originating_change_time  : Wed Sep  2 23:39:07 2009 EST            
2063             originating_invocation_id: 794640f3-18cf-40ee-a211-a93992b67a64    
2064             originating_usn          : 0x000000000001e19c (123292)             
2065      &target: struct drsuapi_DsReplicaObjectIdentifier3                        
2066         __ndr_size               : 0x0000007e (126)                            
2067         __ndr_size_sid           : 0x0000001c (28)                             
2068         guid                     : 7639e594-db75-4086-b0d4-67890ae46031        
2069         sid                      : S-1-5-21-2848215498-2472035911-1947525656-19924
2070         __ndr_size_dn            : 0x00000022 (34)                                
2071         dn                       : 'CN=UOne,OU=TestOU,DC=vsofs8,DC=com'           
2072  */
2073         if (DEBUGLVL(4)) {
2074                 NDR_PRINT_DEBUG(drsuapi_DsReplicaLinkedAttribute, la); 
2075         }
2076         
2077         /* decode the target of the link */
2078         ndr_err = ndr_pull_struct_blob(la->value.blob, 
2079                                        tmp_ctx, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
2080                                        &target,
2081                                        (ndr_pull_flags_fn_t)ndr_pull_drsuapi_DsReplicaObjectIdentifier3);
2082         if (ndr_err != NDR_ERR_SUCCESS) {
2083                 DEBUG(0,("Unable to decode linked_attribute target\n"));
2084                 dump_data(4, la->value.blob->data, la->value.blob->length);                     
2085                 talloc_free(tmp_ctx);
2086                 return LDB_ERR_OPERATIONS_ERROR;
2087         }
2088         if (DEBUGLVL(4)) {
2089                 NDR_PRINT_DEBUG(drsuapi_DsReplicaObjectIdentifier3, &target);
2090         }
2091
2092         /* construct a modify request for this attribute change */
2093         msg = ldb_msg_new(tmp_ctx);
2094         if (!msg) {
2095                 ldb_oom(ldb);
2096                 talloc_free(tmp_ctx);
2097                 return LDB_ERR_OPERATIONS_ERROR;
2098         }
2099
2100         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, 
2101                                    GUID_string(tmp_ctx, &la->identifier->guid), &msg->dn);
2102         if (ret != LDB_SUCCESS) {
2103                 talloc_free(tmp_ctx);
2104                 return ret;
2105         }
2106
2107         /* find the attribute being modified */
2108         attr = dsdb_attribute_by_attributeID_id(dsdb_get_schema(ldb), la->attid);
2109         if (attr == NULL) {
2110                 DEBUG(0, (__location__ ": Unable to find attributeID 0x%x\n", la->attid));
2111                 talloc_free(tmp_ctx);
2112                 return LDB_ERR_OPERATIONS_ERROR;
2113         }
2114
2115         if (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) {
2116                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
2117                                         LDB_FLAG_MOD_ADD, &ret_el);
2118         } else {
2119                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
2120                                         LDB_FLAG_MOD_DELETE, &ret_el);
2121         }
2122         if (ret != LDB_SUCCESS) {
2123                 talloc_free(tmp_ctx);
2124                 return ret;
2125         }
2126         ret_el->values = talloc_array(msg, struct ldb_val, 1);
2127         if (!ret_el->values) {
2128                 ldb_oom(ldb);
2129                 talloc_free(tmp_ctx);
2130                 return LDB_ERR_OPERATIONS_ERROR;
2131         }
2132         ret_el->num_values = 1;
2133
2134         target_dn = talloc_asprintf(tmp_ctx, "<GUID=%s>;<SID=%s>;%s",
2135                                     GUID_string(tmp_ctx, &target.guid),
2136                                     dom_sid_string(tmp_ctx, &target.sid),
2137                                     target.dn);
2138         if (target_dn == NULL) {
2139                 ldb_oom(ldb);
2140                 talloc_free(tmp_ctx);
2141                 return LDB_ERR_OPERATIONS_ERROR;
2142         }
2143         ret_el->values[0] = data_blob_string_const(target_dn);
2144
2145         ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2146                                 msg,
2147                                 NULL,
2148                                 NULL, 
2149                                 ldb_op_default_callback,
2150                                 NULL);
2151         if (ret != LDB_SUCCESS) {
2152                 talloc_free(tmp_ctx);
2153                 return ret;
2154         }
2155         talloc_steal(mod_req, msg);
2156
2157         if (DEBUGLVL(4)) {
2158                 DEBUG(4,("Applying DRS linked attribute change:\n%s\n",
2159                          ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg)));
2160         }
2161
2162         /* Run the new request */
2163         ret = ldb_next_request(module, mod_req);
2164
2165         /* we need to wait for this to finish, as we are being called
2166            from the synchronous end_transaction hook of this module */
2167         if (ret == LDB_SUCCESS) {
2168                 ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
2169         }
2170
2171         if (ret != LDB_SUCCESS) {
2172                 ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s' %s\n",
2173                           ldb_errstring(ldb),
2174                           ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
2175         }
2176         
2177         talloc_free(tmp_ctx);
2178
2179         return ret;     
2180 }
2181
2182 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
2183 {
2184         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
2185                 return replmd_extended_replicated_objects(module, req);
2186         }
2187
2188         return ldb_next_request(module, req);
2189 }
2190
2191
2192 /*
2193   we hook into the transaction operations to allow us to 
2194   perform the linked attribute updates at the end of the whole
2195   transaction. This allows a forward linked attribute to be created
2196   before the object is created. During a vampire, w2k8 sends us linked
2197   attributes before the objects they are part of.
2198  */
2199 static int replmd_start_transaction(struct ldb_module *module)
2200 {
2201         /* create our private structure for this transaction */
2202         int i;
2203         struct replmd_private *replmd_private = talloc_get_type(ldb_module_get_private(module),
2204                                                                 struct replmd_private);
2205         talloc_free(replmd_private->la_ctx);
2206         replmd_private->la_list = NULL;
2207         replmd_private->la_ctx = NULL;
2208
2209         for (i=0; i<replmd_private->num_ncs; i++) {
2210                 replmd_private->ncs[i].mod_usn = 0;
2211         }
2212
2213         return ldb_next_start_trans(module);
2214 }
2215
2216 /*
2217   on prepare commit we loop over our queued la_context structures and
2218   apply each of them  
2219  */
2220 static int replmd_prepare_commit(struct ldb_module *module)
2221 {
2222         struct replmd_private *replmd_private = 
2223                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2224         struct la_entry *la, *prev;
2225         int ret;
2226
2227         /* walk the list backwards, to do the first entry first, as we
2228          * added the entries with DLIST_ADD() which puts them at the
2229          * start of the list */
2230         for (la = replmd_private->la_list; la && la->next; la=la->next) ;
2231
2232         for (; la; la=prev) {
2233                 prev = la->prev;
2234                 DLIST_REMOVE(replmd_private->la_list, la);
2235                 ret = replmd_process_linked_attribute(module, la);
2236                 if (ret != LDB_SUCCESS) {
2237                         return ret;
2238                 }
2239         }
2240
2241         talloc_free(replmd_private->la_ctx);
2242         replmd_private->la_list = NULL;
2243         replmd_private->la_ctx = NULL;
2244
2245         /* possibly change @REPLCHANGED */
2246         ret = replmd_notify_store(module);
2247         if (ret != LDB_SUCCESS) {
2248                 return ret;
2249         }
2250         
2251         return ldb_next_prepare_commit(module);
2252 }
2253
2254 static int replmd_del_transaction(struct ldb_module *module)
2255 {
2256         struct replmd_private *replmd_private = 
2257                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2258         talloc_free(replmd_private->la_ctx);
2259         replmd_private->la_list = NULL;
2260         replmd_private->la_ctx = NULL;
2261         return ldb_next_del_trans(module);
2262 }
2263
2264
2265 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
2266         .name          = "repl_meta_data",
2267         .init_context      = replmd_init,
2268         .add               = replmd_add,
2269         .modify            = replmd_modify,
2270         .rename            = replmd_rename,
2271         .extended          = replmd_extended,
2272         .start_transaction = replmd_start_transaction,
2273         .prepare_commit    = replmd_prepare_commit,
2274         .del_transaction   = replmd_del_transaction,
2275 };