s4-repl: use the new dsdb partition uSN helper fns
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /*
24  *  Name: ldb
25  *
26  *  Component: ldb repl_meta_data module
27  *
28  *  Description: - add a unique objectGUID onto every new record,
29  *               - handle whenCreated, whenChanged timestamps
30  *               - handle uSNCreated, uSNChanged numbers
31  *               - handle replPropertyMetaData attribute
32  *
33  *  Author: Simo Sorce
34  *  Author: Stefan Metzmacher
35  */
36
37 #include "includes.h"
38 #include "ldb_module.h"
39 #include "dsdb/samdb/samdb.h"
40 #include "dsdb/common/proto.h"
41 #include "../libds/common/flags.h"
42 #include "librpc/gen_ndr/ndr_misc.h"
43 #include "librpc/gen_ndr/ndr_drsuapi.h"
44 #include "librpc/gen_ndr/ndr_drsblobs.h"
45 #include "param/param.h"
46 #include "libcli/security/dom_sid.h"
47 #include "lib/util/dlinklist.h"
48
49 struct replmd_private {
50         struct la_entry *la_list;
51         uint32_t num_ncs;
52         struct nc_entry {
53                 struct ldb_dn *dn;
54                 struct GUID guid;
55                 uint64_t mod_usn;
56         } *ncs;
57 };
58
59 struct la_entry {
60         struct la_entry *next, *prev;
61         struct drsuapi_DsReplicaLinkedAttribute *la;
62 };
63
64 struct replmd_replicated_request {
65         struct ldb_module *module;
66         struct ldb_request *req;
67
68         const struct dsdb_schema *schema;
69
70         struct dsdb_extended_replicated_objects *objs;
71
72         /* the controls we pass down */
73         struct ldb_control **controls;
74
75         uint32_t index_current;
76
77         struct ldb_message *search_msg;
78 };
79
80
81 /*
82   initialise the module
83   allocate the private structure and build the list
84   of partition DNs for use by replmd_notify()
85  */
86 static int replmd_init(struct ldb_module *module)
87 {
88         struct replmd_private *replmd_private;
89         struct ldb_context *ldb = ldb_module_get_ctx(module);
90
91         replmd_private = talloc_zero(module, struct replmd_private);
92         if (replmd_private == NULL) {
93                 ldb_oom(ldb);
94                 return LDB_ERR_OPERATIONS_ERROR;
95         }
96         ldb_module_set_private(module, replmd_private);
97
98         return ldb_next_init(module);
99 }
100
101
102 static int nc_compare(struct nc_entry *n1, struct nc_entry *n2)
103 {
104         return ldb_dn_compare(n1->dn, n2->dn);
105 }
106
107 /*
108   build the list of partition DNs for use by replmd_notify()
109  */
110 static int replmd_load_NCs(struct ldb_module *module)
111 {
112         const char *attrs[] = { "namingContexts", NULL };
113         struct ldb_result *res = NULL;
114         int i, ret;
115         TALLOC_CTX *tmp_ctx;
116         struct ldb_context *ldb;
117         struct ldb_message_element *el;
118         struct replmd_private *replmd_private = 
119                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
120
121         if (replmd_private->ncs != NULL) {
122                 return LDB_SUCCESS;
123         }
124
125         ldb = ldb_module_get_ctx(module);
126         tmp_ctx = talloc_new(module);
127
128         /* load the list of naming contexts */
129         ret = ldb_search(ldb, tmp_ctx, &res, ldb_dn_new(tmp_ctx, ldb, ""), 
130                          LDB_SCOPE_BASE, attrs, NULL);
131         if (ret != LDB_SUCCESS ||
132             res->count != 1) {
133                 DEBUG(0,(__location__ ": Failed to load rootDSE\n"));
134                 return LDB_ERR_OPERATIONS_ERROR;
135         }
136
137         el = ldb_msg_find_element(res->msgs[0], "namingContexts");
138         if (el == NULL) {
139                 DEBUG(0,(__location__ ": Failed to load namingContexts\n"));
140                 return LDB_ERR_OPERATIONS_ERROR;                
141         }
142
143         replmd_private->num_ncs = el->num_values;
144         replmd_private->ncs = talloc_array(replmd_private, struct nc_entry, 
145                                            replmd_private->num_ncs);
146         if (replmd_private->ncs == NULL) {
147                 ldb_oom(ldb);
148                 return LDB_ERR_OPERATIONS_ERROR;
149         }
150
151         for (i=0; i<replmd_private->num_ncs; i++) {
152                 replmd_private->ncs[i].dn = 
153                         ldb_dn_from_ldb_val(replmd_private->ncs, 
154                                             ldb, &el->values[i]);
155                 replmd_private->ncs[i].mod_usn = 0;
156         }
157
158         talloc_free(res);
159
160         /* now find the GUIDs of each of those DNs */
161         for (i=0; i<replmd_private->num_ncs; i++) {
162                 const char *attrs2[] = { "objectGUID", NULL };
163                 ret = ldb_search(ldb, tmp_ctx, &res, replmd_private->ncs[i].dn,
164                                  LDB_SCOPE_BASE, attrs2, NULL);
165                 if (ret != LDB_SUCCESS ||
166                     res->count != 1) {
167                         /* this happens when the schema is first being
168                            setup */
169                         talloc_free(replmd_private->ncs);
170                         replmd_private->ncs = NULL;
171                         replmd_private->num_ncs = 0;
172                         talloc_free(tmp_ctx);
173                         return LDB_SUCCESS;
174                 }
175                 replmd_private->ncs[i].guid = 
176                         samdb_result_guid(res->msgs[0], "objectGUID");
177                 talloc_free(res);
178         }       
179
180         /* sort the NCs into order, most to least specific */
181         qsort(replmd_private->ncs, replmd_private->num_ncs,
182               sizeof(replmd_private->ncs[0]), QSORT_CAST nc_compare);
183
184         
185         talloc_free(tmp_ctx);
186         
187         return LDB_SUCCESS;
188 }
189
190
191 /*
192  * notify the repl task that a object has changed. The notifies are
193  * gathered up in the replmd_private structure then written to the
194  * @REPLCHANGED object in each partition during the prepare_commit
195  */
196 static int replmd_notify(struct ldb_module *module, struct ldb_dn *dn, uint64_t uSN)
197 {
198         int ret, i;
199         struct replmd_private *replmd_private = 
200                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
201
202         ret = replmd_load_NCs(module);
203         if (ret != LDB_SUCCESS) {
204                 return ret;
205         }
206         if (replmd_private->num_ncs == 0) {
207                 return LDB_SUCCESS;
208         }
209
210         for (i=0; i<replmd_private->num_ncs; i++) {
211                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, dn) == 0) {
212                         break;
213                 }
214         }
215         if (i == replmd_private->num_ncs) {
216                 DEBUG(0,(__location__ ": DN not within known NCs '%s'\n", 
217                          ldb_dn_get_linearized(dn)));
218                 return LDB_ERR_OPERATIONS_ERROR;
219         }
220
221         if (uSN > replmd_private->ncs[i].mod_usn) {
222             replmd_private->ncs[i].mod_usn = uSN;
223         }
224
225         return LDB_SUCCESS;
226 }
227
228
229 /*
230  * update a @REPLCHANGED record in each partition if there have been
231  * any writes of replicated data in the partition
232  */
233 static int replmd_notify_store(struct ldb_module *module)
234 {
235         int i;
236         struct replmd_private *replmd_private = 
237                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
238         struct ldb_context *ldb = ldb_module_get_ctx(module);
239
240         for (i=0; i<replmd_private->num_ncs; i++) {
241                 int ret;
242
243                 if (replmd_private->ncs[i].mod_usn == 0) {
244                         /* this partition has not changed in this
245                            transaction */
246                         continue;
247                 }
248
249                 ret = dsdb_save_partition_usn(ldb, replmd_private->ncs[i].dn, 
250                                               replmd_private->ncs[i].mod_usn);
251                 if (ret != LDB_SUCCESS) {
252                         DEBUG(0,(__location__ ": Failed to save partition uSN for %s\n",
253                                  ldb_dn_get_linearized(replmd_private->ncs[i].dn)));
254                         return ret;
255                 }
256         }
257
258         return LDB_SUCCESS;
259 }
260
261
262 /*
263   created a replmd_replicated_request context
264  */
265 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
266                                                          struct ldb_request *req)
267 {
268         struct ldb_context *ldb;
269         struct replmd_replicated_request *ac;
270
271         ldb = ldb_module_get_ctx(module);
272
273         ac = talloc_zero(req, struct replmd_replicated_request);
274         if (ac == NULL) {
275                 ldb_oom(ldb);
276                 return NULL;
277         }
278
279         ac->module = module;
280         ac->req = req;
281         return ac;
282 }
283
284 /*
285   add a time element to a record
286 */
287 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
288 {
289         struct ldb_message_element *el;
290         char *s;
291
292         if (ldb_msg_find_element(msg, attr) != NULL) {
293                 return LDB_SUCCESS;
294         }
295
296         s = ldb_timestring(msg, t);
297         if (s == NULL) {
298                 return LDB_ERR_OPERATIONS_ERROR;
299         }
300
301         if (ldb_msg_add_string(msg, attr, s) != LDB_SUCCESS) {
302                 return LDB_ERR_OPERATIONS_ERROR;
303         }
304
305         el = ldb_msg_find_element(msg, attr);
306         /* always set as replace. This works because on add ops, the flag
307            is ignored */
308         el->flags = LDB_FLAG_MOD_REPLACE;
309
310         return LDB_SUCCESS;
311 }
312
313 /*
314   add a uint64_t element to a record
315 */
316 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
317 {
318         struct ldb_message_element *el;
319
320         if (ldb_msg_find_element(msg, attr) != NULL) {
321                 return LDB_SUCCESS;
322         }
323
324         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != LDB_SUCCESS) {
325                 return LDB_ERR_OPERATIONS_ERROR;
326         }
327
328         el = ldb_msg_find_element(msg, attr);
329         /* always set as replace. This works because on add ops, the flag
330            is ignored */
331         el->flags = LDB_FLAG_MOD_REPLACE;
332
333         return LDB_SUCCESS;
334 }
335
336 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
337                                                    const struct replPropertyMetaData1 *m2,
338                                                    const uint32_t *rdn_attid)
339 {
340         if (m1->attid == m2->attid) {
341                 return 0;
342         }
343
344         /*
345          * the rdn attribute should be at the end!
346          * so we need to return a value greater than zero
347          * which means m1 is greater than m2
348          */
349         if (m1->attid == *rdn_attid) {
350                 return 1;
351         }
352
353         /*
354          * the rdn attribute should be at the end!
355          * so we need to return a value less than zero
356          * which means m2 is greater than m1
357          */
358         if (m2->attid == *rdn_attid) {
359                 return -1;
360         }
361
362         return m1->attid - m2->attid;
363 }
364
365 static void replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
366                                                  const uint32_t *rdn_attid)
367 {
368         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
369                   discard_const_p(void, rdn_attid), (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
370 }
371
372 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
373                                                  const struct ldb_message_element *e2,
374                                                  const struct dsdb_schema *schema)
375 {
376         const struct dsdb_attribute *a1;
377         const struct dsdb_attribute *a2;
378
379         /* 
380          * TODO: make this faster by caching the dsdb_attribute pointer
381          *       on the ldb_messag_element
382          */
383
384         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
385         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
386
387         /*
388          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
389          *       in the schema
390          */
391         if (!a1 || !a2) {
392                 return strcasecmp(e1->name, e2->name);
393         }
394
395         return a1->attributeID_id - a2->attributeID_id;
396 }
397
398 static void replmd_ldb_message_sort(struct ldb_message *msg,
399                                     const struct dsdb_schema *schema)
400 {
401         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
402                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
403 }
404
405 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
406 {
407         struct ldb_context *ldb;
408         struct replmd_replicated_request *ac;
409
410         ac = talloc_get_type(req->context, struct replmd_replicated_request);
411         ldb = ldb_module_get_ctx(ac->module);
412
413         if (!ares) {
414                 return ldb_module_done(ac->req, NULL, NULL,
415                                         LDB_ERR_OPERATIONS_ERROR);
416         }
417         if (ares->error != LDB_SUCCESS) {
418                 return ldb_module_done(ac->req, ares->controls,
419                                         ares->response, ares->error);
420         }
421
422         if (ares->type != LDB_REPLY_DONE) {
423                 ldb_set_errstring(ldb,
424                                   "invalid ldb_reply_type in callback");
425                 talloc_free(ares);
426                 return ldb_module_done(ac->req, NULL, NULL,
427                                         LDB_ERR_OPERATIONS_ERROR);
428         }
429
430         return ldb_module_done(ac->req, ares->controls,
431                                 ares->response, LDB_SUCCESS);
432 }
433
434 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
435 {
436         struct ldb_context *ldb;
437         struct replmd_replicated_request *ac;
438         const struct dsdb_schema *schema;
439         enum ndr_err_code ndr_err;
440         struct ldb_request *down_req;
441         struct ldb_message *msg;
442         const struct dsdb_attribute *rdn_attr = NULL;
443         struct GUID guid;
444         struct ldb_val guid_value;
445         struct replPropertyMetaDataBlob nmd;
446         struct ldb_val nmd_value;
447         uint64_t seq_num;
448         const struct GUID *our_invocation_id;
449         time_t t = time(NULL);
450         NTTIME now;
451         char *time_str;
452         int ret;
453         uint32_t i, ni=0;
454
455         /* do not manipulate our control entries */
456         if (ldb_dn_is_special(req->op.add.message->dn)) {
457                 return ldb_next_request(module, req);
458         }
459
460         ldb = ldb_module_get_ctx(module);
461
462         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
463
464         schema = dsdb_get_schema(ldb);
465         if (!schema) {
466                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
467                               "replmd_add: no dsdb_schema loaded");
468                 return LDB_ERR_CONSTRAINT_VIOLATION;
469         }
470
471         ac = replmd_ctx_init(module, req);
472         if (!ac) {
473                 return LDB_ERR_OPERATIONS_ERROR;
474         }
475
476         ac->schema = schema;
477
478         if (ldb_msg_find_element(req->op.add.message, "objectGUID") != NULL) {
479                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
480                               "replmd_add: it's not allowed to add an object with objectGUID\n");
481                 return LDB_ERR_UNWILLING_TO_PERFORM;
482         }
483
484         /* Get a sequence number from the backend */
485         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
486         if (ret != LDB_SUCCESS) {
487                 return ret;
488         }
489
490         /* a new GUID */
491         guid = GUID_random();
492
493         /* get our invocationId */
494         our_invocation_id = samdb_ntds_invocation_id(ldb);
495         if (!our_invocation_id) {
496                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
497                               "replmd_add: unable to find invocationId\n");
498                 return LDB_ERR_OPERATIONS_ERROR;
499         }
500
501         /* we have to copy the message as the caller might have it as a const */
502         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
503         if (msg == NULL) {
504                 ldb_oom(ldb);
505                 return LDB_ERR_OPERATIONS_ERROR;
506         }
507
508         /* generated times */
509         unix_to_nt_time(&now, t);
510         time_str = ldb_timestring(msg, t);
511         if (!time_str) {
512                 return LDB_ERR_OPERATIONS_ERROR;
513         }
514
515         /* 
516          * remove autogenerated attributes
517          */
518         ldb_msg_remove_attr(msg, "whenCreated");
519         ldb_msg_remove_attr(msg, "whenChanged");
520         ldb_msg_remove_attr(msg, "uSNCreated");
521         ldb_msg_remove_attr(msg, "uSNChanged");
522         ldb_msg_remove_attr(msg, "replPropertyMetaData");
523
524         /*
525          * readd replicated attributes
526          */
527         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
528         if (ret != LDB_SUCCESS) {
529                 ldb_oom(ldb);
530                 return LDB_ERR_OPERATIONS_ERROR;
531         }
532
533         /* build the replication meta_data */
534         ZERO_STRUCT(nmd);
535         nmd.version             = 1;
536         nmd.ctr.ctr1.count      = msg->num_elements;
537         nmd.ctr.ctr1.array      = talloc_array(msg,
538                                                struct replPropertyMetaData1,
539                                                nmd.ctr.ctr1.count);
540         if (!nmd.ctr.ctr1.array) {
541                 ldb_oom(ldb);
542                 return LDB_ERR_OPERATIONS_ERROR;
543         }
544
545         for (i=0; i < msg->num_elements; i++) {
546                 struct ldb_message_element *e = &msg->elements[i];
547                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
548                 const struct dsdb_attribute *sa;
549
550                 if (e->name[0] == '@') continue;
551
552                 sa = dsdb_attribute_by_lDAPDisplayName(schema, e->name);
553                 if (!sa) {
554                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
555                                       "replmd_add: attribute '%s' not defined in schema\n",
556                                       e->name);
557                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
558                 }
559
560                 if ((sa->systemFlags & 0x00000001) || (sa->systemFlags & 0x00000004)) {
561                         /* if the attribute is not replicated (0x00000001)
562                          * or constructed (0x00000004) it has no metadata
563                          */
564                         continue;
565                 }
566
567                 m->attid                        = sa->attributeID_id;
568                 m->version                      = 1;
569                 m->originating_change_time      = now;
570                 m->originating_invocation_id    = *our_invocation_id;
571                 m->originating_usn              = seq_num;
572                 m->local_usn                    = seq_num;
573                 ni++;
574
575                 if (ldb_attr_cmp(e->name, ldb_dn_get_rdn_name(msg->dn))) {
576                         rdn_attr = sa;
577                 }
578         }
579
580         /* fix meta data count */
581         nmd.ctr.ctr1.count = ni;
582
583         /*
584          * sort meta data array, and move the rdn attribute entry to the end
585          */
586         replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_attr->attributeID_id);
587
588         /* generated NDR encoded values */
589         ndr_err = ndr_push_struct_blob(&guid_value, msg, 
590                                        NULL,
591                                        &guid,
592                                        (ndr_push_flags_fn_t)ndr_push_GUID);
593         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
594                 ldb_oom(ldb);
595                 return LDB_ERR_OPERATIONS_ERROR;
596         }
597         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
598                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
599                                        &nmd,
600                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
601         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
602                 ldb_oom(ldb);
603                 return LDB_ERR_OPERATIONS_ERROR;
604         }
605
606         /*
607          * add the autogenerated values
608          */
609         ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
610         if (ret != LDB_SUCCESS) {
611                 ldb_oom(ldb);
612                 return LDB_ERR_OPERATIONS_ERROR;
613         }
614         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
615         if (ret != LDB_SUCCESS) {
616                 ldb_oom(ldb);
617                 return LDB_ERR_OPERATIONS_ERROR;
618         }
619         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
620         if (ret != LDB_SUCCESS) {
621                 ldb_oom(ldb);
622                 return LDB_ERR_OPERATIONS_ERROR;
623         }
624         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
625         if (ret != LDB_SUCCESS) {
626                 ldb_oom(ldb);
627                 return LDB_ERR_OPERATIONS_ERROR;
628         }
629         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
630         if (ret != LDB_SUCCESS) {
631                 ldb_oom(ldb);
632                 return LDB_ERR_OPERATIONS_ERROR;
633         }
634
635         /*
636          * sort the attributes by attid before storing the object
637          */
638         replmd_ldb_message_sort(msg, schema);
639
640         ret = ldb_build_add_req(&down_req, ldb, ac,
641                                 msg,
642                                 req->controls,
643                                 ac, replmd_op_callback,
644                                 req);
645         if (ret != LDB_SUCCESS) {
646                 return ret;
647         }
648
649         ret = replmd_notify(module, msg->dn, seq_num);
650         if (ret != LDB_SUCCESS) {
651                 return ret;
652         }
653
654         /* go on with the call chain */
655         return ldb_next_request(module, down_req);
656 }
657
658
659 /*
660  * update the replPropertyMetaData for one element
661  */
662 static int replmd_update_rpmd_element(struct ldb_context *ldb, 
663                                       struct ldb_message *msg,
664                                       struct ldb_message_element *el,
665                                       struct replPropertyMetaDataBlob *omd,
666                                       struct dsdb_schema *schema,
667                                       uint64_t *seq_num,
668                                       const struct GUID *our_invocation_id,
669                                       NTTIME now)
670 {
671         int i;
672         const struct dsdb_attribute *a;
673         struct replPropertyMetaData1 *md1;
674
675         a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
676         if (a == NULL) {
677                 DEBUG(0,(__location__ ": Unable to find attribute %s in schema\n",
678                          el->name));
679                 return LDB_ERR_OPERATIONS_ERROR;
680         }
681
682         if ((a->systemFlags & 0x00000001) || (a->systemFlags & 0x00000004)) {
683                 /* if the attribute is not replicated (0x00000001)
684                  * or constructed (0x00000004) it has no metadata
685                  */
686                 return LDB_SUCCESS;
687         }
688
689         for (i=0; i<omd->ctr.ctr1.count; i++) {
690                 if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) break;
691         }
692         if (i == omd->ctr.ctr1.count) {
693                 /* we need to add a new one */
694                 omd->ctr.ctr1.array = talloc_realloc(msg, omd->ctr.ctr1.array, 
695                                                      struct replPropertyMetaData1, omd->ctr.ctr1.count+1);
696                 if (omd->ctr.ctr1.array == NULL) {
697                         ldb_oom(ldb);
698                         return LDB_ERR_OPERATIONS_ERROR;
699                 }
700                 omd->ctr.ctr1.count++;
701                 ZERO_STRUCT(omd->ctr.ctr1.array[i]);
702         }
703
704         /* Get a new sequence number from the backend. We only do this
705          * if we have a change that requires a new
706          * replPropertyMetaData element 
707          */
708         if (*seq_num == 0) {
709                 int ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num);
710                 if (ret != LDB_SUCCESS) {
711                         return LDB_ERR_OPERATIONS_ERROR;
712                 }
713         }
714
715         md1 = &omd->ctr.ctr1.array[i];
716         md1->version++;
717         md1->attid                     = a->attributeID_id;
718         md1->originating_change_time   = now;
719         md1->originating_invocation_id = *our_invocation_id;
720         md1->originating_usn           = *seq_num;
721         md1->local_usn                 = *seq_num;
722         
723         return LDB_SUCCESS;
724 }
725
726 /*
727  * update the replPropertyMetaData object each time we modify an
728  * object. This is needed for DRS replication, as the merge on the
729  * client is based on this object 
730  */
731 static int replmd_update_rpmd(struct ldb_module *module, 
732                               struct ldb_message *msg, uint64_t *seq_num)
733 {
734         const struct ldb_val *omd_value;
735         enum ndr_err_code ndr_err;
736         struct replPropertyMetaDataBlob omd;
737         int i;
738         struct dsdb_schema *schema;
739         time_t t = time(NULL);
740         NTTIME now;
741         const struct GUID *our_invocation_id;
742         int ret;
743         const char *attrs[] = { "replPropertyMetaData" , NULL };
744         struct ldb_result *res;
745         struct ldb_context *ldb;
746
747         ldb = ldb_module_get_ctx(module);
748
749         our_invocation_id = samdb_ntds_invocation_id(ldb);
750         if (!our_invocation_id) {
751                 /* this happens during an initial vampire while
752                    updating the schema */
753                 DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
754                 return LDB_SUCCESS;
755         }
756
757         unix_to_nt_time(&now, t);
758
759         /* search for the existing replPropertyMetaDataBlob */
760         ret = ldb_search(ldb, msg, &res, msg->dn, LDB_SCOPE_BASE, attrs, NULL);
761         if (ret != LDB_SUCCESS || res->count < 1) {
762                 DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
763                          ldb_dn_get_linearized(msg->dn)));
764                 return LDB_ERR_OPERATIONS_ERROR;
765         }
766                 
767
768         omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
769         if (!omd_value) {
770                 DEBUG(0,(__location__ ": Object %s does not have a replPropertyMetaData attribute\n",
771                          ldb_dn_get_linearized(msg->dn)));
772                 return LDB_ERR_OPERATIONS_ERROR;
773         }
774
775         ndr_err = ndr_pull_struct_blob(omd_value, msg,
776                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
777                                        (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
778         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
779                 DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
780                          ldb_dn_get_linearized(msg->dn)));
781                 return LDB_ERR_OPERATIONS_ERROR;
782         }
783
784         if (omd.version != 1) {
785                 DEBUG(0,(__location__ ": bad version %u in replPropertyMetaData for %s\n",
786                          omd.version, ldb_dn_get_linearized(msg->dn)));
787                 return LDB_ERR_OPERATIONS_ERROR;
788         }
789
790         schema = dsdb_get_schema(ldb);
791
792         for (i=0; i<msg->num_elements; i++) {
793                 ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], &omd, schema, seq_num, 
794                                                  our_invocation_id, now);
795                 if (ret != LDB_SUCCESS) {
796                         return ret;
797                 }
798         }
799
800         /*
801          * replmd_update_rpmd_element has done an update if the
802          * seq_num is set
803          */
804         if (*seq_num != 0) {
805                 struct ldb_val *md_value;
806                 struct ldb_message_element *el;
807
808                 md_value = talloc(msg, struct ldb_val);
809                 if (md_value == NULL) {
810                         ldb_oom(ldb);
811                         return LDB_ERR_OPERATIONS_ERROR;
812                 }
813
814                 ndr_err = ndr_push_struct_blob(md_value, msg, 
815                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
816                                                &omd,
817                                                (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
818                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
819                         DEBUG(0,(__location__ ": Failed to marshall replPropertyMetaData for %s\n",
820                                  ldb_dn_get_linearized(msg->dn)));
821                         return LDB_ERR_OPERATIONS_ERROR;
822                 }
823
824                 ret = ldb_msg_add_empty(msg, "replPropertyMetaData", LDB_FLAG_MOD_REPLACE, &el);
825                 if (ret != LDB_SUCCESS) {
826                         DEBUG(0,(__location__ ": Failed to add updated replPropertyMetaData %s\n",
827                                  ldb_dn_get_linearized(msg->dn)));
828                         return ret;
829                 }
830
831                 ret = replmd_notify(module, msg->dn, *seq_num);
832                 if (ret != LDB_SUCCESS) {
833                         return ret;
834                 }
835
836                 el->num_values = 1;
837                 el->values = md_value;
838         }
839
840         return LDB_SUCCESS;     
841 }
842
843
844 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
845 {
846         struct ldb_context *ldb;
847         struct replmd_replicated_request *ac;
848         const struct dsdb_schema *schema;
849         struct ldb_request *down_req;
850         struct ldb_message *msg;
851         int ret;
852         time_t t = time(NULL);
853         uint64_t seq_num = 0;
854
855         /* do not manipulate our control entries */
856         if (ldb_dn_is_special(req->op.mod.message->dn)) {
857                 return ldb_next_request(module, req);
858         }
859
860         ldb = ldb_module_get_ctx(module);
861
862         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
863
864         schema = dsdb_get_schema(ldb);
865         if (!schema) {
866                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
867                               "replmd_modify: no dsdb_schema loaded");
868                 return LDB_ERR_CONSTRAINT_VIOLATION;
869         }
870
871         ac = replmd_ctx_init(module, req);
872         if (!ac) {
873                 return LDB_ERR_OPERATIONS_ERROR;
874         }
875
876         ac->schema = schema;
877
878         /* we have to copy the message as the caller might have it as a const */
879         msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
880         if (msg == NULL) {
881                 talloc_free(ac);
882                 return LDB_ERR_OPERATIONS_ERROR;
883         }
884
885         /* TODO:
886          * - get the whole old object
887          * - if the old object doesn't exist report an error
888          * - give an error when a readonly attribute should
889          *   be modified
890          * - merge the changed into the old object
891          *   if the caller set values to the same value
892          *   ignore the attribute, return success when no
893          *   attribute was changed
894          */
895
896         ret = replmd_update_rpmd(module, msg, &seq_num);
897         if (ret != LDB_SUCCESS) {
898                 return ret;
899         }
900
901         /* TODO:
902          * - sort the attributes by attid with replmd_ldb_message_sort()
903          * - replace the old object with the newly constructed one
904          */
905
906         ret = ldb_build_mod_req(&down_req, ldb, ac,
907                                 msg,
908                                 req->controls,
909                                 ac, replmd_op_callback,
910                                 req);
911         if (ret != LDB_SUCCESS) {
912                 return ret;
913         }
914         talloc_steal(down_req, msg);
915
916         /* we only change whenChanged and uSNChanged if the seq_num
917            has changed */
918         if (seq_num != 0) {
919                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
920                         talloc_free(ac);
921                         return LDB_ERR_OPERATIONS_ERROR;
922                 }
923
924                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
925                         talloc_free(ac);
926                         return LDB_ERR_OPERATIONS_ERROR;
927                 }
928         }
929
930         /* go on with the call chain */
931         return ldb_next_request(module, down_req);
932 }
933
934 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
935 {
936         return ret;
937 }
938
939 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
940 {
941         int ret = LDB_ERR_OTHER;
942         /* TODO: do some error mapping */
943         return ret;
944 }
945
946 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
947
948 static int replmd_replicated_apply_add_callback(struct ldb_request *req,
949                                                 struct ldb_reply *ares)
950 {
951         struct ldb_context *ldb;
952         struct replmd_replicated_request *ar = talloc_get_type(req->context,
953                                                struct replmd_replicated_request);
954         int ret;
955
956         ldb = ldb_module_get_ctx(ar->module);
957
958         if (!ares) {
959                 return ldb_module_done(ar->req, NULL, NULL,
960                                         LDB_ERR_OPERATIONS_ERROR);
961         }
962         if (ares->error != LDB_SUCCESS) {
963                 return ldb_module_done(ar->req, ares->controls,
964                                         ares->response, ares->error);
965         }
966
967         if (ares->type != LDB_REPLY_DONE) {
968                 ldb_set_errstring(ldb, "Invalid reply type\n!");
969                 return ldb_module_done(ar->req, NULL, NULL,
970                                         LDB_ERR_OPERATIONS_ERROR);
971         }
972
973         talloc_free(ares);
974         ar->index_current++;
975
976         ret = replmd_replicated_apply_next(ar);
977         if (ret != LDB_SUCCESS) {
978                 return ldb_module_done(ar->req, NULL, NULL, ret);
979         }
980
981         return LDB_SUCCESS;
982 }
983
984 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
985 {
986         struct ldb_context *ldb;
987         struct ldb_request *change_req;
988         enum ndr_err_code ndr_err;
989         struct ldb_message *msg;
990         struct replPropertyMetaDataBlob *md;
991         struct ldb_val md_value;
992         uint32_t i;
993         uint64_t seq_num;
994         int ret;
995
996         /*
997          * TODO: check if the parent object exist
998          */
999
1000         /*
1001          * TODO: handle the conflict case where an object with the
1002          *       same name exist
1003          */
1004
1005         ldb = ldb_module_get_ctx(ar->module);
1006         msg = ar->objs->objects[ar->index_current].msg;
1007         md = ar->objs->objects[ar->index_current].meta_data;
1008
1009         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1010         if (ret != LDB_SUCCESS) {
1011                 return replmd_replicated_request_error(ar, ret);
1012         }
1013
1014         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
1015         if (ret != LDB_SUCCESS) {
1016                 return replmd_replicated_request_error(ar, ret);
1017         }
1018
1019         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1020         if (ret != LDB_SUCCESS) {
1021                 return replmd_replicated_request_error(ar, ret);
1022         }
1023
1024         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
1025         if (ret != LDB_SUCCESS) {
1026                 return replmd_replicated_request_error(ar, ret);
1027         }
1028
1029         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1030         if (ret != LDB_SUCCESS) {
1031                 return replmd_replicated_request_error(ar, ret);
1032         }
1033
1034         ret = replmd_notify(ar->module, msg->dn, seq_num);
1035         if (ret != LDB_SUCCESS) {
1036                 return replmd_replicated_request_error(ar, ret);
1037         }
1038
1039         /*
1040          * the meta data array is already sorted by the caller
1041          */
1042         for (i=0; i < md->ctr.ctr1.count; i++) {
1043                 md->ctr.ctr1.array[i].local_usn = seq_num;
1044         }
1045         ndr_err = ndr_push_struct_blob(&md_value, msg, 
1046                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1047                                        md,
1048                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1049         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1050                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1051                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1052         }
1053         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
1054         if (ret != LDB_SUCCESS) {
1055                 return replmd_replicated_request_error(ar, ret);
1056         }
1057
1058         replmd_ldb_message_sort(msg, ar->schema);
1059
1060         if (DEBUGLVL(4)) {
1061                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
1062                 DEBUG(4, ("DRS replication add message:\n%s\n", s));
1063                 talloc_free(s);
1064         }
1065
1066         ret = ldb_build_add_req(&change_req,
1067                                 ldb,
1068                                 ar,
1069                                 msg,
1070                                 ar->controls,
1071                                 ar,
1072                                 replmd_replicated_apply_add_callback,
1073                                 ar->req);
1074         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1075
1076         return ldb_next_request(ar->module, change_req);
1077 }
1078
1079 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
1080                                                          struct replPropertyMetaData1 *m2)
1081 {
1082         int ret;
1083
1084         if (m1->version != m2->version) {
1085                 return m1->version - m2->version;
1086         }
1087
1088         if (m1->originating_change_time != m2->originating_change_time) {
1089                 return m1->originating_change_time - m2->originating_change_time;
1090         }
1091
1092         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
1093         if (ret != 0) {
1094                 return ret;
1095         }
1096
1097         return m1->originating_usn - m2->originating_usn;
1098 }
1099
1100 static int replmd_replicated_apply_merge_callback(struct ldb_request *req,
1101                                                   struct ldb_reply *ares)
1102 {
1103         struct ldb_context *ldb;
1104         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1105                                                struct replmd_replicated_request);
1106         int ret;
1107
1108         ldb = ldb_module_get_ctx(ar->module);
1109
1110         if (!ares) {
1111                 return ldb_module_done(ar->req, NULL, NULL,
1112                                         LDB_ERR_OPERATIONS_ERROR);
1113         }
1114         if (ares->error != LDB_SUCCESS) {
1115                 return ldb_module_done(ar->req, ares->controls,
1116                                         ares->response, ares->error);
1117         }
1118
1119         if (ares->type != LDB_REPLY_DONE) {
1120                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1121                 return ldb_module_done(ar->req, NULL, NULL,
1122                                         LDB_ERR_OPERATIONS_ERROR);
1123         }
1124
1125         talloc_free(ares);
1126         ar->index_current++;
1127
1128         ret = replmd_replicated_apply_next(ar);
1129         if (ret != LDB_SUCCESS) {
1130                 return ldb_module_done(ar->req, NULL, NULL, ret);
1131         }
1132
1133         return LDB_SUCCESS;
1134 }
1135
1136 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
1137 {
1138         struct ldb_context *ldb;
1139         struct ldb_request *change_req;
1140         enum ndr_err_code ndr_err;
1141         struct ldb_message *msg;
1142         struct replPropertyMetaDataBlob *rmd;
1143         struct replPropertyMetaDataBlob omd;
1144         const struct ldb_val *omd_value;
1145         struct replPropertyMetaDataBlob nmd;
1146         struct ldb_val nmd_value;
1147         uint32_t i,j,ni=0;
1148         uint32_t removed_attrs = 0;
1149         uint64_t seq_num;
1150         int ret;
1151
1152         ldb = ldb_module_get_ctx(ar->module);
1153         msg = ar->objs->objects[ar->index_current].msg;
1154         rmd = ar->objs->objects[ar->index_current].meta_data;
1155         ZERO_STRUCT(omd);
1156         omd.version = 1;
1157
1158         /*
1159          * TODO: check repl data is correct after a rename
1160          */
1161         if (ldb_dn_compare(msg->dn, ar->search_msg->dn) != 0) {
1162                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_request rename %s => %s\n",
1163                           ldb_dn_get_linearized(ar->search_msg->dn),
1164                           ldb_dn_get_linearized(msg->dn));
1165                 if (ldb_rename(ldb, ar->search_msg->dn, msg->dn) != LDB_SUCCESS) {
1166                         ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_replicated_request rename %s => %s failed - %s\n",
1167                                   ldb_dn_get_linearized(ar->search_msg->dn),
1168                                   ldb_dn_get_linearized(msg->dn),
1169                                   ldb_errstring(ldb));
1170                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
1171                 }
1172         }
1173
1174         /* find existing meta data */
1175         omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
1176         if (omd_value) {
1177                 ndr_err = ndr_pull_struct_blob(omd_value, ar,
1178                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
1179                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
1180                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1181                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1182                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1183                 }
1184
1185                 if (omd.version != 1) {
1186                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1187                 }
1188         }
1189
1190         ZERO_STRUCT(nmd);
1191         nmd.version = 1;
1192         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
1193         nmd.ctr.ctr1.array = talloc_array(ar,
1194                                           struct replPropertyMetaData1,
1195                                           nmd.ctr.ctr1.count);
1196         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1197
1198         /* first copy the old meta data */
1199         for (i=0; i < omd.ctr.ctr1.count; i++) {
1200                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
1201                 ni++;
1202         }
1203
1204         /* now merge in the new meta data */
1205         for (i=0; i < rmd->ctr.ctr1.count; i++) {
1206                 bool found = false;
1207
1208                 for (j=0; j < ni; j++) {
1209                         int cmp;
1210
1211                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
1212                                 continue;
1213                         }
1214
1215                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
1216                                                                             &nmd.ctr.ctr1.array[j]);
1217                         if (cmp > 0) {
1218                                 /* replace the entry */
1219                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
1220                                 found = true;
1221                                 break;
1222                         }
1223
1224                         /* we don't want to apply this change so remove the attribute */
1225                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
1226                         removed_attrs++;
1227
1228                         found = true;
1229                         break;
1230                 }
1231
1232                 if (found) continue;
1233
1234                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
1235                 ni++;
1236         }
1237
1238         /*
1239          * finally correct the size of the meta_data array
1240          */
1241         nmd.ctr.ctr1.count = ni;
1242
1243         /*
1244          * the rdn attribute (the alias for the name attribute),
1245          * 'cn' for most objects is the last entry in the meta data array
1246          * we have stored
1247          *
1248          * sort the new meta data array
1249          */
1250         {
1251                 struct replPropertyMetaData1 *rdn_p;
1252                 uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
1253
1254                 rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
1255                 replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_p->attid);
1256         }
1257
1258         /* create the meta data value */
1259         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
1260                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1261                                        &nmd,
1262                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1263         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1264                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1265                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1266         }
1267
1268         /*
1269          * check if some replicated attributes left, otherwise skip the ldb_modify() call
1270          */
1271         if (msg->num_elements == 0) {
1272                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
1273                           ar->index_current);
1274
1275                 ar->index_current++;
1276                 return replmd_replicated_apply_next(ar);
1277         }
1278
1279         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
1280                   ar->index_current, msg->num_elements);
1281
1282         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1283         if (ret != LDB_SUCCESS) {
1284                 return replmd_replicated_request_error(ar, ret);
1285         }
1286
1287         for (i=0; i<ni; i++) {
1288                 nmd.ctr.ctr1.array[i].local_usn = seq_num;
1289         }
1290
1291         /*
1292          * when we know that we'll modify the record, add the whenChanged, uSNChanged
1293          * and replPopertyMetaData attributes
1294          */
1295         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1296         if (ret != LDB_SUCCESS) {
1297                 return replmd_replicated_request_error(ar, ret);
1298         }
1299         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1300         if (ret != LDB_SUCCESS) {
1301                 return replmd_replicated_request_error(ar, ret);
1302         }
1303         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
1304         if (ret != LDB_SUCCESS) {
1305                 return replmd_replicated_request_error(ar, ret);
1306         }
1307
1308         replmd_ldb_message_sort(msg, ar->schema);
1309
1310         /* we want to replace the old values */
1311         for (i=0; i < msg->num_elements; i++) {
1312                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
1313         }
1314
1315         ret = replmd_notify(ar->module, msg->dn, seq_num);
1316         if (ret != LDB_SUCCESS) {
1317                 return replmd_replicated_request_error(ar, ret);
1318         }
1319
1320         if (DEBUGLVL(4)) {
1321                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1322                 DEBUG(4, ("DRS replication modify message:\n%s\n", s));
1323                 talloc_free(s);
1324         }
1325
1326         ret = ldb_build_mod_req(&change_req,
1327                                 ldb,
1328                                 ar,
1329                                 msg,
1330                                 ar->controls,
1331                                 ar,
1332                                 replmd_replicated_apply_merge_callback,
1333                                 ar->req);
1334         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1335
1336         return ldb_next_request(ar->module, change_req);
1337 }
1338
1339 static int replmd_replicated_apply_search_callback(struct ldb_request *req,
1340                                                    struct ldb_reply *ares)
1341 {
1342         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1343                                                struct replmd_replicated_request);
1344         int ret;
1345
1346         if (!ares) {
1347                 return ldb_module_done(ar->req, NULL, NULL,
1348                                         LDB_ERR_OPERATIONS_ERROR);
1349         }
1350         if (ares->error != LDB_SUCCESS &&
1351             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1352                 return ldb_module_done(ar->req, ares->controls,
1353                                         ares->response, ares->error);
1354         }
1355
1356         switch (ares->type) {
1357         case LDB_REPLY_ENTRY:
1358                 ar->search_msg = talloc_steal(ar, ares->message);
1359                 break;
1360
1361         case LDB_REPLY_REFERRAL:
1362                 /* we ignore referrals */
1363                 break;
1364
1365         case LDB_REPLY_DONE:
1366                 if (ar->search_msg != NULL) {
1367                         ret = replmd_replicated_apply_merge(ar);
1368                 } else {
1369                         ret = replmd_replicated_apply_add(ar);
1370                 }
1371                 if (ret != LDB_SUCCESS) {
1372                         return ldb_module_done(ar->req, NULL, NULL, ret);
1373                 }
1374         }
1375
1376         talloc_free(ares);
1377         return LDB_SUCCESS;
1378 }
1379
1380 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
1381
1382 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
1383 {
1384         struct ldb_context *ldb;
1385         int ret;
1386         char *tmp_str;
1387         char *filter;
1388         struct ldb_request *search_req;
1389
1390         if (ar->index_current >= ar->objs->num_objects) {
1391                 /* done with it, go to next stage */
1392                 return replmd_replicated_uptodate_vector(ar);
1393         }
1394
1395         ldb = ldb_module_get_ctx(ar->module);
1396         ar->search_msg = NULL;
1397
1398         tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].guid_value);
1399         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1400
1401         filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
1402         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1403         talloc_free(tmp_str);
1404
1405         ret = ldb_build_search_req(&search_req,
1406                                    ldb,
1407                                    ar,
1408                                    ar->objs->partition_dn,
1409                                    LDB_SCOPE_SUBTREE,
1410                                    filter,
1411                                    NULL,
1412                                    NULL,
1413                                    ar,
1414                                    replmd_replicated_apply_search_callback,
1415                                    ar->req);
1416         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1417
1418         return ldb_next_request(ar->module, search_req);
1419 }
1420
1421 static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
1422                                                       struct ldb_reply *ares)
1423 {
1424         struct ldb_context *ldb;
1425         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1426                                                struct replmd_replicated_request);
1427         ldb = ldb_module_get_ctx(ar->module);
1428
1429         if (!ares) {
1430                 return ldb_module_done(ar->req, NULL, NULL,
1431                                         LDB_ERR_OPERATIONS_ERROR);
1432         }
1433         if (ares->error != LDB_SUCCESS) {
1434                 return ldb_module_done(ar->req, ares->controls,
1435                                         ares->response, ares->error);
1436         }
1437
1438         if (ares->type != LDB_REPLY_DONE) {
1439                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1440                 return ldb_module_done(ar->req, NULL, NULL,
1441                                         LDB_ERR_OPERATIONS_ERROR);
1442         }
1443
1444         talloc_free(ares);
1445
1446         return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
1447 }
1448
1449 static int replmd_drsuapi_DsReplicaCursor2_compare(const struct drsuapi_DsReplicaCursor2 *c1,
1450                                                    const struct drsuapi_DsReplicaCursor2 *c2)
1451 {
1452         return GUID_compare(&c1->source_dsa_invocation_id, &c2->source_dsa_invocation_id);
1453 }
1454
1455 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1456 {
1457         struct ldb_context *ldb;
1458         struct ldb_request *change_req;
1459         enum ndr_err_code ndr_err;
1460         struct ldb_message *msg;
1461         struct replUpToDateVectorBlob ouv;
1462         const struct ldb_val *ouv_value;
1463         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1464         struct replUpToDateVectorBlob nuv;
1465         struct ldb_val nuv_value;
1466         struct ldb_message_element *nuv_el = NULL;
1467         const struct GUID *our_invocation_id;
1468         struct ldb_message_element *orf_el = NULL;
1469         struct repsFromToBlob nrf;
1470         struct ldb_val *nrf_value = NULL;
1471         struct ldb_message_element *nrf_el = NULL;
1472         uint32_t i,j,ni=0;
1473         bool found = false;
1474         time_t t = time(NULL);
1475         NTTIME now;
1476         int ret;
1477
1478         ldb = ldb_module_get_ctx(ar->module);
1479         ruv = ar->objs->uptodateness_vector;
1480         ZERO_STRUCT(ouv);
1481         ouv.version = 2;
1482         ZERO_STRUCT(nuv);
1483         nuv.version = 2;
1484
1485         unix_to_nt_time(&now, t);
1486
1487         /*
1488          * first create the new replUpToDateVector
1489          */
1490         ouv_value = ldb_msg_find_ldb_val(ar->search_msg, "replUpToDateVector");
1491         if (ouv_value) {
1492                 ndr_err = ndr_pull_struct_blob(ouv_value, ar,
1493                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &ouv,
1494                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1495                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1496                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1497                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1498                 }
1499
1500                 if (ouv.version != 2) {
1501                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1502                 }
1503         }
1504
1505         /*
1506          * the new uptodateness vector will at least
1507          * contain 1 entry, one for the source_dsa
1508          *
1509          * plus optional values from our old vector and the one from the source_dsa
1510          */
1511         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
1512         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1513         nuv.ctr.ctr2.cursors = talloc_array(ar,
1514                                             struct drsuapi_DsReplicaCursor2,
1515                                             nuv.ctr.ctr2.count);
1516         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1517
1518         /* first copy the old vector */
1519         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1520                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1521                 ni++;
1522         }
1523
1524         /* get our invocation_id if we have one already attached to the ldb */
1525         our_invocation_id = samdb_ntds_invocation_id(ldb);
1526
1527         /* merge in the source_dsa vector is available */
1528         for (i=0; (ruv && i < ruv->count); i++) {
1529                 found = false;
1530
1531                 if (our_invocation_id &&
1532                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1533                                our_invocation_id)) {
1534                         continue;
1535                 }
1536
1537                 for (j=0; j < ni; j++) {
1538                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1539                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1540                                 continue;
1541                         }
1542
1543                         found = true;
1544
1545                         /*
1546                          * we update only the highest_usn and not the latest_sync_success time,
1547                          * because the last success stands for direct replication
1548                          */
1549                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1550                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1551                         }
1552                         break;                  
1553                 }
1554
1555                 if (found) continue;
1556
1557                 /* if it's not there yet, add it */
1558                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1559                 ni++;
1560         }
1561
1562         /*
1563          * merge in the current highwatermark for the source_dsa
1564          */
1565         found = false;
1566         for (j=0; j < ni; j++) {
1567                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1568                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1569                         continue;
1570                 }
1571
1572                 found = true;
1573
1574                 /*
1575                  * here we update the highest_usn and last_sync_success time
1576                  * because we're directly replicating from the source_dsa
1577                  *
1578                  * and use the tmp_highest_usn because this is what we have just applied
1579                  * to our ldb
1580                  */
1581                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1582                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1583                 break;
1584         }
1585         if (!found) {
1586                 /*
1587                  * here we update the highest_usn and last_sync_success time
1588                  * because we're directly replicating from the source_dsa
1589                  *
1590                  * and use the tmp_highest_usn because this is what we have just applied
1591                  * to our ldb
1592                  */
1593                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1594                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1595                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1596                 ni++;
1597         }
1598
1599         /*
1600          * finally correct the size of the cursors array
1601          */
1602         nuv.ctr.ctr2.count = ni;
1603
1604         /*
1605          * sort the cursors
1606          */
1607         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1608               sizeof(struct drsuapi_DsReplicaCursor2),
1609               (comparison_fn_t)replmd_drsuapi_DsReplicaCursor2_compare);
1610
1611         /*
1612          * create the change ldb_message
1613          */
1614         msg = ldb_msg_new(ar);
1615         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1616         msg->dn = ar->search_msg->dn;
1617
1618         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
1619                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1620                                        &nuv,
1621                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1622         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1623                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1624                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1625         }
1626         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1627         if (ret != LDB_SUCCESS) {
1628                 return replmd_replicated_request_error(ar, ret);
1629         }
1630         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1631
1632         /*
1633          * now create the new repsFrom value from the given repsFromTo1 structure
1634          */
1635         ZERO_STRUCT(nrf);
1636         nrf.version                                     = 1;
1637         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1638         /* and fix some values... */
1639         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1640         nrf.ctr.ctr1.last_success                       = now;
1641         nrf.ctr.ctr1.last_attempt                       = now;
1642         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1643         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1644
1645         /*
1646          * first see if we already have a repsFrom value for the current source dsa
1647          * if so we'll later replace this value
1648          */
1649         orf_el = ldb_msg_find_element(ar->search_msg, "repsFrom");
1650         if (orf_el) {
1651                 for (i=0; i < orf_el->num_values; i++) {
1652                         struct repsFromToBlob *trf;
1653
1654                         trf = talloc(ar, struct repsFromToBlob);
1655                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1656
1657                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), trf,
1658                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1659                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1660                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1661                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1662                         }
1663
1664                         if (trf->version != 1) {
1665                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1666                         }
1667
1668                         /*
1669                          * we compare the source dsa objectGUID not the invocation_id
1670                          * because we want only one repsFrom value per source dsa
1671                          * and when the invocation_id of the source dsa has changed we don't need 
1672                          * the old repsFrom with the old invocation_id
1673                          */
1674                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1675                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1676                                 talloc_free(trf);
1677                                 continue;
1678                         }
1679
1680                         talloc_free(trf);
1681                         nrf_value = &orf_el->values[i];
1682                         break;
1683                 }
1684
1685                 /*
1686                  * copy over all old values to the new ldb_message
1687                  */
1688                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1689                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1690                 *nrf_el = *orf_el;
1691         }
1692
1693         /*
1694          * if we haven't found an old repsFrom value for the current source dsa
1695          * we'll add a new value
1696          */
1697         if (!nrf_value) {
1698                 struct ldb_val zero_value;
1699                 ZERO_STRUCT(zero_value);
1700                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1701                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1702
1703                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1704         }
1705
1706         /* we now fill the value which is already attached to ldb_message */
1707         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
1708                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1709                                        &nrf,
1710                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1711         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1712                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1713                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1714         }
1715
1716         /* 
1717          * the ldb_message_element for the attribute, has all the old values and the new one
1718          * so we'll replace the whole attribute with all values
1719          */
1720         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1721
1722         if (DEBUGLVL(4)) {
1723                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1724                 DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
1725                 talloc_free(s);
1726         }
1727
1728         /* prepare the ldb_modify() request */
1729         ret = ldb_build_mod_req(&change_req,
1730                                 ldb,
1731                                 ar,
1732                                 msg,
1733                                 ar->controls,
1734                                 ar,
1735                                 replmd_replicated_uptodate_modify_callback,
1736                                 ar->req);
1737         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1738
1739         return ldb_next_request(ar->module, change_req);
1740 }
1741
1742 static int replmd_replicated_uptodate_search_callback(struct ldb_request *req,
1743                                                       struct ldb_reply *ares)
1744 {
1745         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1746                                                struct replmd_replicated_request);
1747         int ret;
1748
1749         if (!ares) {
1750                 return ldb_module_done(ar->req, NULL, NULL,
1751                                         LDB_ERR_OPERATIONS_ERROR);
1752         }
1753         if (ares->error != LDB_SUCCESS &&
1754             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1755                 return ldb_module_done(ar->req, ares->controls,
1756                                         ares->response, ares->error);
1757         }
1758
1759         switch (ares->type) {
1760         case LDB_REPLY_ENTRY:
1761                 ar->search_msg = talloc_steal(ar, ares->message);
1762                 break;
1763
1764         case LDB_REPLY_REFERRAL:
1765                 /* we ignore referrals */
1766                 break;
1767
1768         case LDB_REPLY_DONE:
1769                 if (ar->search_msg == NULL) {
1770                         ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1771                 } else {
1772                         ret = replmd_replicated_uptodate_modify(ar);
1773                 }
1774                 if (ret != LDB_SUCCESS) {
1775                         return ldb_module_done(ar->req, NULL, NULL, ret);
1776                 }
1777         }
1778
1779         talloc_free(ares);
1780         return LDB_SUCCESS;
1781 }
1782
1783
1784 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1785 {
1786         struct ldb_context *ldb;
1787         int ret;
1788         static const char *attrs[] = {
1789                 "replUpToDateVector",
1790                 "repsFrom",
1791                 NULL
1792         };
1793         struct ldb_request *search_req;
1794
1795         ldb = ldb_module_get_ctx(ar->module);
1796         ar->search_msg = NULL;
1797
1798         ret = ldb_build_search_req(&search_req,
1799                                    ldb,
1800                                    ar,
1801                                    ar->objs->partition_dn,
1802                                    LDB_SCOPE_BASE,
1803                                    "(objectClass=*)",
1804                                    attrs,
1805                                    NULL,
1806                                    ar,
1807                                    replmd_replicated_uptodate_search_callback,
1808                                    ar->req);
1809         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1810
1811         return ldb_next_request(ar->module, search_req);
1812 }
1813
1814
1815
1816 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1817 {
1818         struct ldb_context *ldb;
1819         struct dsdb_extended_replicated_objects *objs;
1820         struct replmd_replicated_request *ar;
1821         struct ldb_control **ctrls;
1822         int ret, i;
1823         struct dsdb_control_current_partition *partition_ctrl;
1824         struct replmd_private *replmd_private = 
1825                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1826
1827         ldb = ldb_module_get_ctx(module);
1828
1829         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1830
1831         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1832         if (!objs) {
1833                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1834                 return LDB_ERR_PROTOCOL_ERROR;
1835         }
1836
1837         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1838                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1839                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1840                 return LDB_ERR_PROTOCOL_ERROR;
1841         }
1842
1843         ar = replmd_ctx_init(module, req);
1844         if (!ar)
1845                 return LDB_ERR_OPERATIONS_ERROR;
1846
1847         ar->objs = objs;
1848         ar->schema = dsdb_get_schema(ldb);
1849         if (!ar->schema) {
1850                 ldb_debug_set(ldb, LDB_DEBUG_FATAL, "replmd_ctx_init: no loaded schema found\n");
1851                 talloc_free(ar);
1852                 return LDB_ERR_CONSTRAINT_VIOLATION;
1853         }
1854
1855         ctrls = req->controls;
1856
1857         if (req->controls) {
1858                 req->controls = talloc_memdup(ar, req->controls,
1859                                               talloc_get_size(req->controls));
1860                 if (!req->controls) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1861         }
1862
1863         ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, NULL);
1864         if (ret != LDB_SUCCESS) {
1865                 return ret;
1866         }
1867
1868         /*
1869           add the DSDB_CONTROL_CURRENT_PARTITION_OID control. This
1870           tells the partition module which partition this request is
1871           directed at. That is important as the partition roots appear
1872           twice in the directory, once as mount points in the top
1873           level store, and once as the roots of each partition. The
1874           replication code wants to operate on the root of the
1875           partitions, not the top level mount points
1876          */
1877         partition_ctrl = talloc(req, struct dsdb_control_current_partition);
1878         if (partition_ctrl == NULL) {
1879                 if (!partition_ctrl) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1880         }
1881         partition_ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
1882         partition_ctrl->dn = objs->partition_dn;
1883
1884         ret = ldb_request_add_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID, false, partition_ctrl);
1885         if (ret != LDB_SUCCESS) {
1886                 return ret;
1887         }
1888
1889         ar->controls = req->controls;
1890         req->controls = ctrls;
1891
1892         DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
1893
1894         /* save away the linked attributes for the end of the
1895            transaction */
1896         for (i=0; i<ar->objs->linked_attributes_count; i++) {
1897                 struct la_entry *la_entry;
1898
1899                 if (replmd_private->la_list) {
1900                         la_entry = talloc(replmd_private->la_list,
1901                                           struct la_entry);
1902                 } else {
1903                         la_entry = talloc(replmd_private,
1904                                           struct la_entry);
1905                 }
1906                 if (la_entry == NULL) {
1907                         ldb_oom(ldb);
1908                         return LDB_ERR_OPERATIONS_ERROR;
1909                 }
1910                 la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
1911                 if (la_entry->la == NULL) {
1912                         talloc_free(la_entry);
1913                         ldb_oom(ldb);
1914                         return LDB_ERR_OPERATIONS_ERROR;
1915                 }
1916                 *la_entry->la = ar->objs->linked_attributes[i];
1917
1918                 /* we need to steal the non-scalars so they stay
1919                    around until the end of the transaction */
1920                 talloc_steal(la_entry->la, la_entry->la->identifier);
1921                 talloc_steal(la_entry->la, la_entry->la->value.blob);
1922
1923                 DLIST_ADD(replmd_private->la_list, la_entry);
1924         }
1925
1926         return replmd_replicated_apply_next(ar);
1927 }
1928
1929 /*
1930   process one linked attribute structure
1931  */
1932 static int replmd_process_linked_attribute(struct ldb_module *module,
1933                                            struct la_entry *la_entry)
1934 {                                          
1935         struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
1936         struct ldb_context *ldb = ldb_module_get_ctx(module);
1937         struct drsuapi_DsReplicaObjectIdentifier3 target;
1938         struct ldb_message *msg;
1939         struct ldb_message_element *ret_el;
1940         TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
1941         enum ndr_err_code ndr_err;
1942         char *target_dn;
1943         struct ldb_request *mod_req;
1944         int ret;
1945         const struct dsdb_attribute *attr;
1946
1947 /*
1948 linked_attributes[0]:                                                     
1949      &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute 
1950         identifier               : *                                      
1951             identifier: struct drsuapi_DsReplicaObjectIdentifier          
1952                 __ndr_size               : 0x0000003a (58)                
1953                 __ndr_size_sid           : 0x00000000 (0)                 
1954                 guid                     : 8e95b6a9-13dd-4158-89db-3220a5be5cc7
1955                 sid                      : S-0-0                               
1956                 __ndr_size_dn            : 0x00000000 (0)                      
1957                 dn                       : ''                                  
1958         attid                    : DRSUAPI_ATTRIBUTE_member (0x1F)             
1959         value: struct drsuapi_DsAttributeValue                                 
1960             __ndr_size               : 0x0000007e (126)                        
1961             blob                     : *                                       
1962                 blob                     : DATA_BLOB length=126                
1963         flags                    : 0x00000001 (1)                              
1964                1: DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE                      
1965         originating_add_time     : Wed Sep  2 22:20:01 2009 EST                
1966         meta_data: struct drsuapi_DsReplicaMetaData                            
1967             version                  : 0x00000015 (21)                         
1968             originating_change_time  : Wed Sep  2 23:39:07 2009 EST            
1969             originating_invocation_id: 794640f3-18cf-40ee-a211-a93992b67a64    
1970             originating_usn          : 0x000000000001e19c (123292)             
1971      &target: struct drsuapi_DsReplicaObjectIdentifier3                        
1972         __ndr_size               : 0x0000007e (126)                            
1973         __ndr_size_sid           : 0x0000001c (28)                             
1974         guid                     : 7639e594-db75-4086-b0d4-67890ae46031        
1975         sid                      : S-1-5-21-2848215498-2472035911-1947525656-19924
1976         __ndr_size_dn            : 0x00000022 (34)                                
1977         dn                       : 'CN=UOne,OU=TestOU,DC=vsofs8,DC=com'           
1978  */
1979         if (DEBUGLVL(4)) {
1980                 NDR_PRINT_DEBUG(drsuapi_DsReplicaLinkedAttribute, la); 
1981         }
1982         
1983         /* decode the target of the link */
1984         ndr_err = ndr_pull_struct_blob(la->value.blob, 
1985                                        tmp_ctx, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1986                                        &target,
1987                                        (ndr_pull_flags_fn_t)ndr_pull_drsuapi_DsReplicaObjectIdentifier3);
1988         if (ndr_err != NDR_ERR_SUCCESS) {
1989                 DEBUG(0,("Unable to decode linked_attribute target\n"));
1990                 dump_data(4, la->value.blob->data, la->value.blob->length);                     
1991                 talloc_free(tmp_ctx);
1992                 return LDB_ERR_OPERATIONS_ERROR;
1993         }
1994         if (DEBUGLVL(4)) {
1995                 NDR_PRINT_DEBUG(drsuapi_DsReplicaObjectIdentifier3, &target);
1996         }
1997
1998         /* construct a modify request for this attribute change */
1999         msg = ldb_msg_new(tmp_ctx);
2000         if (!msg) {
2001                 ldb_oom(ldb);
2002                 talloc_free(tmp_ctx);
2003                 return LDB_ERR_OPERATIONS_ERROR;
2004         }
2005
2006         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, 
2007                                    GUID_string(tmp_ctx, &la->identifier->guid), &msg->dn);
2008         if (ret != LDB_SUCCESS) {
2009                 talloc_free(tmp_ctx);
2010                 return ret;
2011         }
2012
2013         /* find the attribute being modified */
2014         attr = dsdb_attribute_by_attributeID_id(dsdb_get_schema(ldb), la->attid);
2015         if (attr == NULL) {
2016                 DEBUG(0, (__location__ ": Unable to find attributeID 0x%x\n", la->attid));
2017                 talloc_free(tmp_ctx);
2018                 return LDB_ERR_OPERATIONS_ERROR;
2019         }
2020
2021         if (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) {
2022                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
2023                                         LDB_FLAG_MOD_ADD, &ret_el);
2024         } else {
2025                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
2026                                         LDB_FLAG_MOD_DELETE, &ret_el);
2027         }
2028         if (ret != LDB_SUCCESS) {
2029                 talloc_free(tmp_ctx);
2030                 return ret;
2031         }
2032         ret_el->values = talloc_array(msg, struct ldb_val, 1);
2033         if (!ret_el->values) {
2034                 ldb_oom(ldb);
2035                 talloc_free(tmp_ctx);
2036                 return LDB_ERR_OPERATIONS_ERROR;
2037         }
2038         ret_el->num_values = 1;
2039
2040         target_dn = talloc_asprintf(tmp_ctx, "<GUID=%s>;<SID=%s>;%s",
2041                                     GUID_string(tmp_ctx, &target.guid),
2042                                     dom_sid_string(tmp_ctx, &target.sid),
2043                                     target.dn);
2044         if (target_dn == NULL) {
2045                 ldb_oom(ldb);
2046                 talloc_free(tmp_ctx);
2047                 return LDB_ERR_OPERATIONS_ERROR;
2048         }
2049         ret_el->values[0] = data_blob_string_const(target_dn);
2050
2051         ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2052                                 msg,
2053                                 NULL,
2054                                 NULL, 
2055                                 ldb_op_default_callback,
2056                                 NULL);
2057         if (ret != LDB_SUCCESS) {
2058                 talloc_free(tmp_ctx);
2059                 return ret;
2060         }
2061         talloc_steal(mod_req, msg);
2062
2063         if (DEBUGLVL(4)) {
2064                 DEBUG(4,("Applying DRS linked attribute change:\n%s\n",
2065                          ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg)));
2066         }
2067
2068         /* Run the new request */
2069         ret = ldb_next_request(module, mod_req);
2070
2071         /* we need to wait for this to finish, as we are being called
2072            from the synchronous end_transaction hook of this module */
2073         if (ret == LDB_SUCCESS) {
2074                 ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
2075         }
2076
2077         if (ret != LDB_SUCCESS) {
2078                 ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s' %s\n",
2079                           ldb_errstring(ldb),
2080                           ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
2081         }
2082         
2083         talloc_free(tmp_ctx);
2084
2085         return ret;     
2086 }
2087
2088 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
2089 {
2090         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
2091                 return replmd_extended_replicated_objects(module, req);
2092         }
2093
2094         return ldb_next_request(module, req);
2095 }
2096
2097
2098 /*
2099   we hook into the transaction operations to allow us to 
2100   perform the linked attribute updates at the end of the whole
2101   transaction. This allows a forward linked attribute to be created
2102   before the object is created. During a vampire, w2k8 sends us linked
2103   attributes before the objects they are part of.
2104  */
2105 static int replmd_start_transaction(struct ldb_module *module)
2106 {
2107         /* create our private structure for this transaction */
2108         int i;
2109         struct replmd_private *replmd_private = talloc_get_type(ldb_module_get_private(module),
2110                                                                 struct replmd_private);
2111         talloc_free(replmd_private->la_list);
2112         replmd_private->la_list = NULL;
2113
2114         for (i=0; i<replmd_private->num_ncs; i++) {
2115                 replmd_private->ncs[i].mod_usn = 0;
2116         }
2117
2118         return ldb_next_start_trans(module);
2119 }
2120
2121 /*
2122   on prepare commit we loop over our queued la_context structures and
2123   apply each of them  
2124  */
2125 static int replmd_prepare_commit(struct ldb_module *module)
2126 {
2127         struct replmd_private *replmd_private = 
2128                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2129         struct la_entry *la, *prev;
2130         int ret;
2131
2132         /* walk the list backwards, to do the first entry first, as we
2133          * added the entries with DLIST_ADD() which puts them at the
2134          * start of the list */
2135         for (la = replmd_private->la_list; la && la->next; la=la->next) ;
2136
2137         for (; la; la=prev) {
2138                 prev = la->prev;
2139                 DLIST_REMOVE(replmd_private->la_list, la);
2140                 ret = replmd_process_linked_attribute(module, la);
2141                 talloc_free(la);
2142                 if (ret != LDB_SUCCESS) {
2143                         return ret;
2144                 }
2145         }
2146
2147         talloc_free(replmd_private->la_list);
2148         replmd_private->la_list = NULL;
2149
2150         /* possibly change @REPLCHANGED */
2151         ret = replmd_notify_store(module);
2152         if (ret != LDB_SUCCESS) {
2153                 return ret;
2154         }
2155         
2156         return ldb_next_prepare_commit(module);
2157 }
2158
2159 static int replmd_del_transaction(struct ldb_module *module)
2160 {
2161         struct replmd_private *replmd_private = 
2162                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2163         talloc_free(replmd_private->la_list);
2164         replmd_private->la_list = NULL;
2165         return ldb_next_del_trans(module);
2166 }
2167
2168
2169 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
2170         .name          = "repl_meta_data",
2171         .init_context      = replmd_init,
2172         .add               = replmd_add,
2173         .modify            = replmd_modify,
2174         .extended          = replmd_extended,
2175         .start_transaction = replmd_start_transaction,
2176         .prepare_commit    = replmd_prepare_commit,
2177         .del_transaction   = replmd_del_transaction,
2178 };