a0dd111bdf4e83dddf6e6c706ee04e544a98c969
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /*
24  *  Name: ldb
25  *
26  *  Component: ldb repl_meta_data module
27  *
28  *  Description: - add a unique objectGUID onto every new record,
29  *               - handle whenCreated, whenChanged timestamps
30  *               - handle uSNCreated, uSNChanged numbers
31  *               - handle replPropertyMetaData attribute
32  *
33  *  Author: Simo Sorce
34  *  Author: Stefan Metzmacher
35  */
36
37 #include "includes.h"
38 #include "ldb_module.h"
39 #include "dsdb/samdb/samdb.h"
40 #include "dsdb/common/proto.h"
41 #include "../libds/common/flags.h"
42 #include "librpc/gen_ndr/ndr_misc.h"
43 #include "librpc/gen_ndr/ndr_drsuapi.h"
44 #include "librpc/gen_ndr/ndr_drsblobs.h"
45 #include "param/param.h"
46 #include "libcli/security/dom_sid.h"
47 #include "lib/util/dlinklist.h"
48
49 struct replmd_private {
50         TALLOC_CTX *la_ctx;
51         struct la_entry *la_list;
52         uint32_t num_ncs;
53         struct nc_entry {
54                 struct ldb_dn *dn;
55                 struct GUID guid;
56                 uint64_t mod_usn;
57         } *ncs;
58 };
59
60 struct la_entry {
61         struct la_entry *next, *prev;
62         struct drsuapi_DsReplicaLinkedAttribute *la;
63 };
64
65 struct replmd_replicated_request {
66         struct ldb_module *module;
67         struct ldb_request *req;
68
69         const struct dsdb_schema *schema;
70
71         struct dsdb_extended_replicated_objects *objs;
72
73         /* the controls we pass down */
74         struct ldb_control **controls;
75
76         uint32_t index_current;
77
78         struct ldb_message *search_msg;
79
80         uint64_t seq_num;
81 };
82
83
84 /*
85   initialise the module
86   allocate the private structure and build the list
87   of partition DNs for use by replmd_notify()
88  */
89 static int replmd_init(struct ldb_module *module)
90 {
91         struct replmd_private *replmd_private;
92         struct ldb_context *ldb = ldb_module_get_ctx(module);
93
94         replmd_private = talloc_zero(module, struct replmd_private);
95         if (replmd_private == NULL) {
96                 ldb_oom(ldb);
97                 return LDB_ERR_OPERATIONS_ERROR;
98         }
99         ldb_module_set_private(module, replmd_private);
100
101         return ldb_next_init(module);
102 }
103
104
105 static int nc_compare(struct nc_entry *n1, struct nc_entry *n2)
106 {
107         return ldb_dn_compare(n1->dn, n2->dn);
108 }
109
110 /*
111   build the list of partition DNs for use by replmd_notify()
112  */
113 static int replmd_load_NCs(struct ldb_module *module)
114 {
115         const char *attrs[] = { "namingContexts", NULL };
116         struct ldb_result *res = NULL;
117         int i, ret;
118         TALLOC_CTX *tmp_ctx;
119         struct ldb_context *ldb;
120         struct ldb_message_element *el;
121         struct replmd_private *replmd_private = 
122                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
123
124         if (replmd_private->ncs != NULL) {
125                 return LDB_SUCCESS;
126         }
127
128         ldb = ldb_module_get_ctx(module);
129         tmp_ctx = talloc_new(module);
130
131         /* load the list of naming contexts */
132         ret = ldb_search(ldb, tmp_ctx, &res, ldb_dn_new(tmp_ctx, ldb, ""), 
133                          LDB_SCOPE_BASE, attrs, NULL);
134         if (ret != LDB_SUCCESS ||
135             res->count != 1) {
136                 DEBUG(0,(__location__ ": Failed to load rootDSE\n"));
137                 return LDB_ERR_OPERATIONS_ERROR;
138         }
139
140         el = ldb_msg_find_element(res->msgs[0], "namingContexts");
141         if (el == NULL) {
142                 DEBUG(0,(__location__ ": Failed to load namingContexts\n"));
143                 return LDB_ERR_OPERATIONS_ERROR;                
144         }
145
146         replmd_private->num_ncs = el->num_values;
147         replmd_private->ncs = talloc_array(replmd_private, struct nc_entry, 
148                                            replmd_private->num_ncs);
149         if (replmd_private->ncs == NULL) {
150                 ldb_oom(ldb);
151                 return LDB_ERR_OPERATIONS_ERROR;
152         }
153
154         for (i=0; i<replmd_private->num_ncs; i++) {
155                 replmd_private->ncs[i].dn = 
156                         ldb_dn_from_ldb_val(replmd_private->ncs, 
157                                             ldb, &el->values[i]);
158                 replmd_private->ncs[i].mod_usn = 0;
159         }
160
161         talloc_free(res);
162
163         /* now find the GUIDs of each of those DNs */
164         for (i=0; i<replmd_private->num_ncs; i++) {
165                 const char *attrs2[] = { "objectGUID", NULL };
166                 ret = ldb_search(ldb, tmp_ctx, &res, replmd_private->ncs[i].dn,
167                                  LDB_SCOPE_BASE, attrs2, NULL);
168                 if (ret != LDB_SUCCESS ||
169                     res->count != 1) {
170                         /* this happens when the schema is first being
171                            setup */
172                         talloc_free(replmd_private->ncs);
173                         replmd_private->ncs = NULL;
174                         replmd_private->num_ncs = 0;
175                         talloc_free(tmp_ctx);
176                         return LDB_SUCCESS;
177                 }
178                 replmd_private->ncs[i].guid = 
179                         samdb_result_guid(res->msgs[0], "objectGUID");
180                 talloc_free(res);
181         }       
182
183         /* sort the NCs into order, most to least specific */
184         qsort(replmd_private->ncs, replmd_private->num_ncs,
185               sizeof(replmd_private->ncs[0]), QSORT_CAST nc_compare);
186
187         
188         talloc_free(tmp_ctx);
189         
190         return LDB_SUCCESS;
191 }
192
193
194 /*
195  * notify the repl task that a object has changed. The notifies are
196  * gathered up in the replmd_private structure then written to the
197  * @REPLCHANGED object in each partition during the prepare_commit
198  */
199 static int replmd_notify(struct ldb_module *module, struct ldb_dn *dn, uint64_t uSN)
200 {
201         int ret, i;
202         struct replmd_private *replmd_private = 
203                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
204
205         ret = replmd_load_NCs(module);
206         if (ret != LDB_SUCCESS) {
207                 return ret;
208         }
209         if (replmd_private->num_ncs == 0) {
210                 return LDB_SUCCESS;
211         }
212
213         for (i=0; i<replmd_private->num_ncs; i++) {
214                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, dn) == 0) {
215                         break;
216                 }
217         }
218         if (i == replmd_private->num_ncs) {
219                 DEBUG(0,(__location__ ": DN not within known NCs '%s'\n", 
220                          ldb_dn_get_linearized(dn)));
221                 return LDB_ERR_OPERATIONS_ERROR;
222         }
223
224         if (uSN > replmd_private->ncs[i].mod_usn) {
225             replmd_private->ncs[i].mod_usn = uSN;
226         }
227
228         return LDB_SUCCESS;
229 }
230
231
232 /*
233  * update a @REPLCHANGED record in each partition if there have been
234  * any writes of replicated data in the partition
235  */
236 static int replmd_notify_store(struct ldb_module *module)
237 {
238         int i;
239         struct replmd_private *replmd_private = 
240                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
241         struct ldb_context *ldb = ldb_module_get_ctx(module);
242
243         for (i=0; i<replmd_private->num_ncs; i++) {
244                 int ret;
245
246                 if (replmd_private->ncs[i].mod_usn == 0) {
247                         /* this partition has not changed in this
248                            transaction */
249                         continue;
250                 }
251
252                 ret = dsdb_save_partition_usn(ldb, replmd_private->ncs[i].dn, 
253                                               replmd_private->ncs[i].mod_usn);
254                 if (ret != LDB_SUCCESS) {
255                         DEBUG(0,(__location__ ": Failed to save partition uSN for %s\n",
256                                  ldb_dn_get_linearized(replmd_private->ncs[i].dn)));
257                         return ret;
258                 }
259         }
260
261         return LDB_SUCCESS;
262 }
263
264
265 /*
266   created a replmd_replicated_request context
267  */
268 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
269                                                          struct ldb_request *req)
270 {
271         struct ldb_context *ldb;
272         struct replmd_replicated_request *ac;
273
274         ldb = ldb_module_get_ctx(module);
275
276         ac = talloc_zero(req, struct replmd_replicated_request);
277         if (ac == NULL) {
278                 ldb_oom(ldb);
279                 return NULL;
280         }
281
282         ac->module = module;
283         ac->req = req;
284         return ac;
285 }
286
287 /*
288   add a time element to a record
289 */
290 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
291 {
292         struct ldb_message_element *el;
293         char *s;
294
295         if (ldb_msg_find_element(msg, attr) != NULL) {
296                 return LDB_SUCCESS;
297         }
298
299         s = ldb_timestring(msg, t);
300         if (s == NULL) {
301                 return LDB_ERR_OPERATIONS_ERROR;
302         }
303
304         if (ldb_msg_add_string(msg, attr, s) != LDB_SUCCESS) {
305                 return LDB_ERR_OPERATIONS_ERROR;
306         }
307
308         el = ldb_msg_find_element(msg, attr);
309         /* always set as replace. This works because on add ops, the flag
310            is ignored */
311         el->flags = LDB_FLAG_MOD_REPLACE;
312
313         return LDB_SUCCESS;
314 }
315
316 /*
317   add a uint64_t element to a record
318 */
319 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
320 {
321         struct ldb_message_element *el;
322
323         if (ldb_msg_find_element(msg, attr) != NULL) {
324                 return LDB_SUCCESS;
325         }
326
327         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != LDB_SUCCESS) {
328                 return LDB_ERR_OPERATIONS_ERROR;
329         }
330
331         el = ldb_msg_find_element(msg, attr);
332         /* always set as replace. This works because on add ops, the flag
333            is ignored */
334         el->flags = LDB_FLAG_MOD_REPLACE;
335
336         return LDB_SUCCESS;
337 }
338
339 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
340                                                    const struct replPropertyMetaData1 *m2,
341                                                    const uint32_t *rdn_attid)
342 {
343         if (m1->attid == m2->attid) {
344                 return 0;
345         }
346
347         /*
348          * the rdn attribute should be at the end!
349          * so we need to return a value greater than zero
350          * which means m1 is greater than m2
351          */
352         if (m1->attid == *rdn_attid) {
353                 return 1;
354         }
355
356         /*
357          * the rdn attribute should be at the end!
358          * so we need to return a value less than zero
359          * which means m2 is greater than m1
360          */
361         if (m2->attid == *rdn_attid) {
362                 return -1;
363         }
364
365         return m1->attid - m2->attid;
366 }
367
368 static int replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
369                                                 const struct dsdb_schema *schema,
370                                                 struct ldb_dn *dn)
371 {
372         const char *rdn_name;
373         const struct dsdb_attribute *rdn_sa;
374
375         rdn_name = ldb_dn_get_rdn_name(dn);
376         if (!rdn_name) {
377                 DEBUG(0,(__location__ ": No rDN for %s?\n", ldb_dn_get_linearized(dn)));
378                 return LDB_ERR_OPERATIONS_ERROR;
379         }
380
381         rdn_sa = dsdb_attribute_by_lDAPDisplayName(schema, rdn_name);
382         if (rdn_sa == NULL) {
383                 DEBUG(0,(__location__ ": No sa found for rDN %s for %s\n", rdn_name, ldb_dn_get_linearized(dn)));
384                 return LDB_ERR_OPERATIONS_ERROR;                
385         }
386
387         DEBUG(6,("Sorting rpmd with attid exception %u rDN=%s DN=%s\n", 
388                  rdn_sa->attributeID_id, rdn_name, ldb_dn_get_linearized(dn)));
389
390         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
391                   discard_const_p(void, &rdn_sa->attributeID_id), 
392                   (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
393
394         return LDB_SUCCESS;
395 }
396
397 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
398                                                  const struct ldb_message_element *e2,
399                                                  const struct dsdb_schema *schema)
400 {
401         const struct dsdb_attribute *a1;
402         const struct dsdb_attribute *a2;
403
404         /* 
405          * TODO: make this faster by caching the dsdb_attribute pointer
406          *       on the ldb_messag_element
407          */
408
409         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
410         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
411
412         /*
413          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
414          *       in the schema
415          */
416         if (!a1 || !a2) {
417                 return strcasecmp(e1->name, e2->name);
418         }
419
420         return a1->attributeID_id - a2->attributeID_id;
421 }
422
423 static void replmd_ldb_message_sort(struct ldb_message *msg,
424                                     const struct dsdb_schema *schema)
425 {
426         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
427                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
428 }
429
430 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
431 {
432         struct ldb_context *ldb;
433         struct replmd_replicated_request *ac;
434         int ret;
435
436         ac = talloc_get_type(req->context, struct replmd_replicated_request);
437         ldb = ldb_module_get_ctx(ac->module);
438
439         if (!ares) {
440                 return ldb_module_done(ac->req, NULL, NULL,
441                                         LDB_ERR_OPERATIONS_ERROR);
442         }
443         if (ares->error != LDB_SUCCESS) {
444                 return ldb_module_done(ac->req, ares->controls,
445                                         ares->response, ares->error);
446         }
447
448         if (ares->type != LDB_REPLY_DONE) {
449                 ldb_set_errstring(ldb,
450                                   "invalid ldb_reply_type in callback");
451                 talloc_free(ares);
452                 return ldb_module_done(ac->req, NULL, NULL,
453                                         LDB_ERR_OPERATIONS_ERROR);
454         }
455
456         ret = replmd_notify(ac->module, req->op.add.message->dn, ac->seq_num);
457         if (ret != LDB_SUCCESS) {
458                 return ret;
459         }
460
461         return ldb_module_done(ac->req, ares->controls,
462                                 ares->response, LDB_SUCCESS);
463 }
464
465 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
466 {
467         struct ldb_context *ldb;
468         struct ldb_control *control;
469         struct ldb_control **saved_controls;
470         struct replmd_replicated_request *ac;
471         const struct dsdb_schema *schema;
472         enum ndr_err_code ndr_err;
473         struct ldb_request *down_req;
474         struct ldb_message *msg;
475         const DATA_BLOB *guid_blob;
476         struct GUID guid;
477         struct ldb_val guid_value;
478         struct replPropertyMetaDataBlob nmd;
479         struct ldb_val nmd_value;
480         const struct GUID *our_invocation_id;
481         time_t t = time(NULL);
482         NTTIME now;
483         char *time_str;
484         int ret;
485         uint32_t i, ni=0;
486         bool allow_add_guid = false;
487         bool remove_current_guid = false;
488
489         /* check if there's a show relax control (used by provision to say 'I know what I'm doing') */
490         control = ldb_request_get_control(req, LDB_CONTROL_RELAX_OID);
491         if (control) {
492                 allow_add_guid = 1;
493         }
494
495         /* do not manipulate our control entries */
496         if (ldb_dn_is_special(req->op.add.message->dn)) {
497                 return ldb_next_request(module, req);
498         }
499
500         ldb = ldb_module_get_ctx(module);
501
502         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
503
504         schema = dsdb_get_schema(ldb);
505         if (!schema) {
506                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
507                               "replmd_add: no dsdb_schema loaded");
508                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
509                 return LDB_ERR_CONSTRAINT_VIOLATION;
510         }
511
512         ac = replmd_ctx_init(module, req);
513         if (!ac) {
514                 return LDB_ERR_OPERATIONS_ERROR;
515         }
516
517         ac->schema = schema;
518
519         guid_blob = ldb_msg_find_ldb_val(req->op.add.message, "objectGUID");
520         if ( guid_blob != NULL ) {
521                 if( !allow_add_guid ) {
522                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
523                               "replmd_add: it's not allowed to add an object with objectGUID\n");
524                         talloc_free(ac);
525                         return LDB_ERR_UNWILLING_TO_PERFORM;
526                 } else {
527                         NTSTATUS status = GUID_from_data_blob(guid_blob,&guid);
528                         if ( !NT_STATUS_IS_OK(status)) {
529                                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
530                                       "replmd_add: Unable to parse as a GUID the attribute objectGUID\n");
531                                 talloc_free(ac);
532                                 return LDB_ERR_UNWILLING_TO_PERFORM;
533                         }
534                         /* we remove this attribute as it can be a string and will not be treated 
535                         correctly and then we will readd it latter on in the good format*/
536                         remove_current_guid = true;
537                 }
538         } else {
539                 /* a new GUID */
540                 guid = GUID_random();
541         }
542
543         /* Get a sequence number from the backend */
544         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ac->seq_num);
545         if (ret != LDB_SUCCESS) {
546                 talloc_free(ac);
547                 return ret;
548         }
549
550         /* get our invocationId */
551         our_invocation_id = samdb_ntds_invocation_id(ldb);
552         if (!our_invocation_id) {
553                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
554                               "replmd_add: unable to find invocationId\n");
555                 talloc_free(ac);
556                 return LDB_ERR_OPERATIONS_ERROR;
557         }
558
559         /* we have to copy the message as the caller might have it as a const */
560         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
561         if (msg == NULL) {
562                 ldb_oom(ldb);
563                 talloc_free(ac);
564                 return LDB_ERR_OPERATIONS_ERROR;
565         }
566
567         /* generated times */
568         unix_to_nt_time(&now, t);
569         time_str = ldb_timestring(msg, t);
570         if (!time_str) {
571                 ldb_oom(ldb);
572                 talloc_free(ac);
573                 return LDB_ERR_OPERATIONS_ERROR;
574         }
575         if (remove_current_guid) {
576                 ldb_msg_remove_attr(msg,"objectGUID");
577         }
578
579         /* 
580          * remove autogenerated attributes
581          */
582         ldb_msg_remove_attr(msg, "whenCreated");
583         ldb_msg_remove_attr(msg, "whenChanged");
584         ldb_msg_remove_attr(msg, "uSNCreated");
585         ldb_msg_remove_attr(msg, "uSNChanged");
586         ldb_msg_remove_attr(msg, "replPropertyMetaData");
587
588         if (!ldb_msg_find_element(req->op.add.message, "instanceType")) {
589                 ret = ldb_msg_add_fmt(msg, "instanceType", "%u", INSTANCE_TYPE_WRITE);
590                 if (ret != LDB_SUCCESS) {
591                         ldb_oom(ldb);
592                         talloc_free(ac);
593                         return ret;
594                 }
595         }
596
597         /*
598          * readd replicated attributes
599          */
600         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
601         if (ret != LDB_SUCCESS) {
602                 ldb_oom(ldb);
603                 talloc_free(ac);
604                 return ret;
605         }
606
607         /* build the replication meta_data */
608         ZERO_STRUCT(nmd);
609         nmd.version             = 1;
610         nmd.ctr.ctr1.count      = msg->num_elements;
611         nmd.ctr.ctr1.array      = talloc_array(msg,
612                                                struct replPropertyMetaData1,
613                                                nmd.ctr.ctr1.count);
614         if (!nmd.ctr.ctr1.array) {
615                 ldb_oom(ldb);
616                 talloc_free(ac);
617                 return LDB_ERR_OPERATIONS_ERROR;
618         }
619
620         for (i=0; i < msg->num_elements; i++) {
621                 struct ldb_message_element *e = &msg->elements[i];
622                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
623                 const struct dsdb_attribute *sa;
624
625                 if (e->name[0] == '@') continue;
626
627                 sa = dsdb_attribute_by_lDAPDisplayName(schema, e->name);
628                 if (!sa) {
629                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
630                                       "replmd_add: attribute '%s' not defined in schema\n",
631                                       e->name);
632                         talloc_free(ac);
633                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
634                 }
635
636                 if ((sa->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (sa->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
637                         /* if the attribute is not replicated (0x00000001)
638                          * or constructed (0x00000004) it has no metadata
639                          */
640                         continue;
641                 }
642
643                 m->attid                        = sa->attributeID_id;
644                 m->version                      = 1;
645                 m->originating_change_time      = now;
646                 m->originating_invocation_id    = *our_invocation_id;
647                 m->originating_usn              = ac->seq_num;
648                 m->local_usn                    = ac->seq_num;
649                 ni++;
650         }
651
652         /* fix meta data count */
653         nmd.ctr.ctr1.count = ni;
654
655         /*
656          * sort meta data array, and move the rdn attribute entry to the end
657          */
658         ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, schema, msg->dn);
659         if (ret != LDB_SUCCESS) {
660                 talloc_free(ac);
661                 return ret;
662         }
663
664         /* generated NDR encoded values */
665         ndr_err = ndr_push_struct_blob(&guid_value, msg, 
666                                        NULL,
667                                        &guid,
668                                        (ndr_push_flags_fn_t)ndr_push_GUID);
669         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
670                 ldb_oom(ldb);
671                 talloc_free(ac);
672                 return LDB_ERR_OPERATIONS_ERROR;
673         }
674         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
675                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
676                                        &nmd,
677                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
678         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
679                 ldb_oom(ldb);
680                 talloc_free(ac);
681                 return LDB_ERR_OPERATIONS_ERROR;
682         }
683
684         /*
685          * add the autogenerated values
686          */
687         ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
688         if (ret != LDB_SUCCESS) {
689                 ldb_oom(ldb);
690                 talloc_free(ac);
691                 return ret;
692         }
693         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
694         if (ret != LDB_SUCCESS) {
695                 ldb_oom(ldb);
696                 talloc_free(ac);
697                 return ret;
698         }
699         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", ac->seq_num);
700         if (ret != LDB_SUCCESS) {
701                 ldb_oom(ldb);
702                 talloc_free(ac);
703                 return ret;
704         }
705         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ac->seq_num);
706         if (ret != LDB_SUCCESS) {
707                 ldb_oom(ldb);
708                 talloc_free(ac);
709                 return ret;
710         }
711         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
712         if (ret != LDB_SUCCESS) {
713                 ldb_oom(ldb);
714                 talloc_free(ac);
715                 return ret;
716         }
717
718         /*
719          * sort the attributes by attid before storing the object
720          */
721         replmd_ldb_message_sort(msg, schema);
722
723         ret = ldb_build_add_req(&down_req, ldb, ac,
724                                 msg,
725                                 req->controls,
726                                 ac, replmd_op_callback,
727                                 req);
728         if (ret != LDB_SUCCESS) {
729                 talloc_free(ac);
730                 return ret;
731         }
732
733         /* if a control is there remove if from the modified request */
734         if (control && !save_controls(control, down_req, &saved_controls)) {
735                 talloc_free(ac);
736                 return LDB_ERR_OPERATIONS_ERROR;
737         }
738
739         /* go on with the call chain */
740         return ldb_next_request(module, down_req);
741 }
742
743
744 /*
745  * update the replPropertyMetaData for one element
746  */
747 static int replmd_update_rpmd_element(struct ldb_context *ldb, 
748                                       struct ldb_message *msg,
749                                       struct ldb_message_element *el,
750                                       struct replPropertyMetaDataBlob *omd,
751                                       struct dsdb_schema *schema,
752                                       uint64_t *seq_num,
753                                       const struct GUID *our_invocation_id,
754                                       NTTIME now)
755 {
756         int i;
757         const struct dsdb_attribute *a;
758         struct replPropertyMetaData1 *md1;
759
760         a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
761         if (a == NULL) {
762                 DEBUG(0,(__location__ ": Unable to find attribute %s in schema\n",
763                          el->name));
764                 return LDB_ERR_OPERATIONS_ERROR;
765         }
766
767         if ((a->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (a->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
768                 return LDB_SUCCESS;
769         }
770
771         for (i=0; i<omd->ctr.ctr1.count; i++) {
772                 if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) break;
773         }
774         if (i == omd->ctr.ctr1.count) {
775                 /* we need to add a new one */
776                 omd->ctr.ctr1.array = talloc_realloc(msg, omd->ctr.ctr1.array, 
777                                                      struct replPropertyMetaData1, omd->ctr.ctr1.count+1);
778                 if (omd->ctr.ctr1.array == NULL) {
779                         ldb_oom(ldb);
780                         return LDB_ERR_OPERATIONS_ERROR;
781                 }
782                 omd->ctr.ctr1.count++;
783                 ZERO_STRUCT(omd->ctr.ctr1.array[i]);
784         }
785
786         /* Get a new sequence number from the backend. We only do this
787          * if we have a change that requires a new
788          * replPropertyMetaData element 
789          */
790         if (*seq_num == 0) {
791                 int ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num);
792                 if (ret != LDB_SUCCESS) {
793                         return LDB_ERR_OPERATIONS_ERROR;
794                 }
795         }
796
797         md1 = &omd->ctr.ctr1.array[i];
798         md1->version++;
799         md1->attid                     = a->attributeID_id;
800         md1->originating_change_time   = now;
801         md1->originating_invocation_id = *our_invocation_id;
802         md1->originating_usn           = *seq_num;
803         md1->local_usn                 = *seq_num;
804         
805         return LDB_SUCCESS;
806 }
807
808 /*
809  * update the replPropertyMetaData object each time we modify an
810  * object. This is needed for DRS replication, as the merge on the
811  * client is based on this object 
812  */
813 static int replmd_update_rpmd(struct ldb_module *module, 
814                               struct ldb_message *msg, uint64_t *seq_num)
815 {
816         const struct ldb_val *omd_value;
817         enum ndr_err_code ndr_err;
818         struct replPropertyMetaDataBlob omd;
819         int i;
820         struct dsdb_schema *schema;
821         time_t t = time(NULL);
822         NTTIME now;
823         const struct GUID *our_invocation_id;
824         int ret;
825         const char *attrs[] = { "replPropertyMetaData" , NULL };
826         struct ldb_result *res;
827         struct ldb_context *ldb;
828
829         ldb = ldb_module_get_ctx(module);
830
831         our_invocation_id = samdb_ntds_invocation_id(ldb);
832         if (!our_invocation_id) {
833                 /* this happens during an initial vampire while
834                    updating the schema */
835                 DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
836                 return LDB_SUCCESS;
837         }
838
839         unix_to_nt_time(&now, t);
840
841         /* search for the existing replPropertyMetaDataBlob */
842         ret = dsdb_search_dn_with_deleted(ldb, msg, &res, msg->dn, attrs);
843         if (ret != LDB_SUCCESS || res->count != 1) {
844                 DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
845                          ldb_dn_get_linearized(msg->dn)));
846                 return LDB_ERR_OPERATIONS_ERROR;
847         }
848                 
849
850         omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
851         if (!omd_value) {
852                 DEBUG(0,(__location__ ": Object %s does not have a replPropertyMetaData attribute\n",
853                          ldb_dn_get_linearized(msg->dn)));
854                 return LDB_ERR_OPERATIONS_ERROR;
855         }
856
857         ndr_err = ndr_pull_struct_blob(omd_value, msg,
858                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
859                                        (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
860         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
861                 DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
862                          ldb_dn_get_linearized(msg->dn)));
863                 return LDB_ERR_OPERATIONS_ERROR;
864         }
865
866         if (omd.version != 1) {
867                 DEBUG(0,(__location__ ": bad version %u in replPropertyMetaData for %s\n",
868                          omd.version, ldb_dn_get_linearized(msg->dn)));
869                 return LDB_ERR_OPERATIONS_ERROR;
870         }
871
872         schema = dsdb_get_schema(ldb);
873
874         for (i=0; i<msg->num_elements; i++) {
875                 ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], &omd, schema, seq_num, 
876                                                  our_invocation_id, now);
877                 if (ret != LDB_SUCCESS) {
878                         return ret;
879                 }
880         }
881
882         /*
883          * replmd_update_rpmd_element has done an update if the
884          * seq_num is set
885          */
886         if (*seq_num != 0) {
887                 struct ldb_val *md_value;
888                 struct ldb_message_element *el;
889
890                 md_value = talloc(msg, struct ldb_val);
891                 if (md_value == NULL) {
892                         ldb_oom(ldb);
893                         return LDB_ERR_OPERATIONS_ERROR;
894                 }
895
896                 ret = replmd_replPropertyMetaDataCtr1_sort(&omd.ctr.ctr1, schema, msg->dn);
897                 if (ret != LDB_SUCCESS) {
898                         return ret;
899                 }
900
901                 ndr_err = ndr_push_struct_blob(md_value, msg, 
902                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
903                                                &omd,
904                                                (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
905                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
906                         DEBUG(0,(__location__ ": Failed to marshall replPropertyMetaData for %s\n",
907                                  ldb_dn_get_linearized(msg->dn)));
908                         return LDB_ERR_OPERATIONS_ERROR;
909                 }
910
911                 ret = ldb_msg_add_empty(msg, "replPropertyMetaData", LDB_FLAG_MOD_REPLACE, &el);
912                 if (ret != LDB_SUCCESS) {
913                         DEBUG(0,(__location__ ": Failed to add updated replPropertyMetaData %s\n",
914                                  ldb_dn_get_linearized(msg->dn)));
915                         return ret;
916                 }
917
918                 el->num_values = 1;
919                 el->values = md_value;
920         }
921
922         return LDB_SUCCESS;     
923 }
924
925
926 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
927 {
928         struct ldb_context *ldb;
929         struct replmd_replicated_request *ac;
930         const struct dsdb_schema *schema;
931         struct ldb_request *down_req;
932         struct ldb_message *msg;
933         struct ldb_result *res;
934         time_t t = time(NULL);
935         uint64_t seq_num = 0;
936         int ret;
937
938         /* do not manipulate our control entries */
939         if (ldb_dn_is_special(req->op.mod.message->dn)) {
940                 return ldb_next_request(module, req);
941         }
942
943         ldb = ldb_module_get_ctx(module);
944
945         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
946
947         schema = dsdb_get_schema(ldb);
948         if (!schema) {
949                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
950                               "replmd_modify: no dsdb_schema loaded");
951                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
952                 return LDB_ERR_CONSTRAINT_VIOLATION;
953         }
954
955         ac = replmd_ctx_init(module, req);
956         if (!ac) {
957                 return LDB_ERR_OPERATIONS_ERROR;
958         }
959
960         ac->schema = schema;
961
962         /* we have to copy the message as the caller might have it as a const */
963         msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
964         if (msg == NULL) {
965                 ldb_oom(ldb);
966                 talloc_free(ac);
967                 return LDB_ERR_OPERATIONS_ERROR;
968         }
969
970         /* TODO:
971          * - give an error when a readonly attribute should
972          *   be modified
973          * - merge the changed into the old object
974          *   if the caller set values to the same value
975          *   ignore the attribute, return success when no
976          *   attribute was changed
977          */
978
979         ret = dsdb_search_dn_with_deleted(ldb, msg, &res, msg->dn, NULL);
980         if (ret != LDB_SUCCESS) {
981                 talloc_free(ac);
982                 return ret;
983         }
984
985         ret = replmd_update_rpmd(module, msg, &ac->seq_num);
986         if (ret != LDB_SUCCESS) {
987                 talloc_free(ac);
988                 return ret;
989         }
990
991         /* TODO:
992          * - replace the old object with the newly constructed one
993          */
994
995         ret = ldb_build_mod_req(&down_req, ldb, ac,
996                                 msg,
997                                 req->controls,
998                                 ac, replmd_op_callback,
999                                 req);
1000         if (ret != LDB_SUCCESS) {
1001                 talloc_free(ac);
1002                 return ret;
1003         }
1004         talloc_steal(down_req, msg);
1005
1006         /* we only change whenChanged and uSNChanged if the seq_num
1007            has changed */
1008         if (seq_num != 0) {
1009                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
1010                         talloc_free(ac);
1011                         return ret;
1012                 }
1013
1014                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
1015                         talloc_free(ac);
1016                         return ret;
1017                 }
1018         }
1019
1020         /* go on with the call chain */
1021         return ldb_next_request(module, down_req);
1022 }
1023
1024
1025 /*
1026   handle a rename request
1027
1028   On a rename we need to do an extra ldb_modify which sets the
1029   whenChanged and uSNChanged attributes
1030  */
1031 static int replmd_rename(struct ldb_module *module, struct ldb_request *req)
1032 {
1033         struct ldb_context *ldb;
1034         int ret, i;
1035         time_t t = time(NULL);
1036         uint64_t seq_num = 0;
1037         struct ldb_message *msg;
1038         struct replmd_private *replmd_private = 
1039                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1040
1041         /* do not manipulate our control entries */
1042         if (ldb_dn_is_special(req->op.mod.message->dn)) {
1043                 return ldb_next_request(module, req);
1044         }
1045
1046         ldb = ldb_module_get_ctx(module);
1047
1048         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_rename\n");
1049
1050         /* Get a sequence number from the backend */
1051         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1052         if (ret != LDB_SUCCESS) {
1053                 return ret;
1054         }
1055
1056         msg = ldb_msg_new(req);
1057         if (msg == NULL) {
1058                 ldb_oom(ldb);
1059                 return LDB_ERR_OPERATIONS_ERROR;
1060         }
1061
1062         msg->dn = req->op.rename.olddn;
1063
1064         if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
1065                 talloc_free(msg);
1066                 return LDB_ERR_OPERATIONS_ERROR;
1067         }
1068         msg->elements[0].flags = LDB_FLAG_MOD_REPLACE;
1069
1070         if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
1071                 talloc_free(msg);
1072                 return LDB_ERR_OPERATIONS_ERROR;
1073         }
1074         msg->elements[1].flags = LDB_FLAG_MOD_REPLACE;
1075
1076         ret = ldb_modify(ldb, msg);
1077         talloc_free(msg);
1078         if (ret != LDB_SUCCESS) {
1079                 return ret;
1080         }
1081
1082         ret = replmd_load_NCs(module);
1083         if (ret != 0) {
1084                 return ret;
1085         }
1086
1087         /* now update the highest uSNs of the partitions that are
1088            affected. Note that two partitions could be changing */
1089         for (i=0; i<replmd_private->num_ncs; i++) {
1090                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, 
1091                                         req->op.rename.olddn) == 0) {
1092                         break;
1093                 }
1094         }
1095         if (i == replmd_private->num_ncs) {
1096                 DEBUG(0,(__location__ ": rename olddn outside tree? %s\n",
1097                          ldb_dn_get_linearized(req->op.rename.olddn)));
1098                 return LDB_ERR_OPERATIONS_ERROR;
1099         }
1100         replmd_private->ncs[i].mod_usn = seq_num;
1101
1102         for (i=0; i<replmd_private->num_ncs; i++) {
1103                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, 
1104                                         req->op.rename.newdn) == 0) {
1105                         break;
1106                 }
1107         }
1108         if (i == replmd_private->num_ncs) {
1109                 DEBUG(0,(__location__ ": rename newdn outside tree? %s\n",
1110                          ldb_dn_get_linearized(req->op.rename.newdn)));
1111                 return LDB_ERR_OPERATIONS_ERROR;
1112         }
1113         replmd_private->ncs[i].mod_usn = seq_num;
1114         
1115         /* go on with the call chain */
1116         return ldb_next_request(module, req);
1117 }
1118
1119
1120 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
1121 {
1122         return ret;
1123 }
1124
1125 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
1126 {
1127         int ret = LDB_ERR_OTHER;
1128         /* TODO: do some error mapping */
1129         return ret;
1130 }
1131
1132 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
1133
1134 static int replmd_replicated_apply_add_callback(struct ldb_request *req,
1135                                                 struct ldb_reply *ares)
1136 {
1137         struct ldb_context *ldb;
1138         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1139                                                struct replmd_replicated_request);
1140         int ret;
1141
1142         ldb = ldb_module_get_ctx(ar->module);
1143
1144         if (!ares) {
1145                 return ldb_module_done(ar->req, NULL, NULL,
1146                                         LDB_ERR_OPERATIONS_ERROR);
1147         }
1148         if (ares->error != LDB_SUCCESS) {
1149                 return ldb_module_done(ar->req, ares->controls,
1150                                         ares->response, ares->error);
1151         }
1152
1153         if (ares->type != LDB_REPLY_DONE) {
1154                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1155                 return ldb_module_done(ar->req, NULL, NULL,
1156                                         LDB_ERR_OPERATIONS_ERROR);
1157         }
1158
1159         talloc_free(ares);
1160         ar->index_current++;
1161
1162         ret = replmd_replicated_apply_next(ar);
1163         if (ret != LDB_SUCCESS) {
1164                 return ldb_module_done(ar->req, NULL, NULL, ret);
1165         }
1166
1167         return LDB_SUCCESS;
1168 }
1169
1170 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
1171 {
1172         struct ldb_context *ldb;
1173         struct ldb_request *change_req;
1174         enum ndr_err_code ndr_err;
1175         struct ldb_message *msg;
1176         struct replPropertyMetaDataBlob *md;
1177         struct ldb_val md_value;
1178         uint32_t i;
1179         uint64_t seq_num;
1180         int ret;
1181
1182         /*
1183          * TODO: check if the parent object exist
1184          */
1185
1186         /*
1187          * TODO: handle the conflict case where an object with the
1188          *       same name exist
1189          */
1190
1191         ldb = ldb_module_get_ctx(ar->module);
1192         msg = ar->objs->objects[ar->index_current].msg;
1193         md = ar->objs->objects[ar->index_current].meta_data;
1194
1195         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1196         if (ret != LDB_SUCCESS) {
1197                 return replmd_replicated_request_error(ar, ret);
1198         }
1199
1200         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
1201         if (ret != LDB_SUCCESS) {
1202                 return replmd_replicated_request_error(ar, ret);
1203         }
1204
1205         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1206         if (ret != LDB_SUCCESS) {
1207                 return replmd_replicated_request_error(ar, ret);
1208         }
1209
1210         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
1211         if (ret != LDB_SUCCESS) {
1212                 return replmd_replicated_request_error(ar, ret);
1213         }
1214
1215         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1216         if (ret != LDB_SUCCESS) {
1217                 return replmd_replicated_request_error(ar, ret);
1218         }
1219
1220         ret = replmd_notify(ar->module, msg->dn, seq_num);
1221         if (ret != LDB_SUCCESS) {
1222                 return replmd_replicated_request_error(ar, ret);
1223         }
1224
1225         /* remove any message elements that have zero values */
1226         for (i=0; i<msg->num_elements; i++) {
1227                 if (msg->elements[i].num_values == 0) {
1228                         DEBUG(4,(__location__ ": Removing attribute %s with num_values==0\n",
1229                                  msg->elements[i].name));
1230                         memmove(&msg->elements[i], 
1231                                 &msg->elements[i+1], 
1232                                 sizeof(msg->elements[i])*(msg->num_elements - (i+1)));
1233                         msg->num_elements--;
1234                         i--;
1235                 }
1236         }
1237         
1238         /*
1239          * the meta data array is already sorted by the caller
1240          */
1241         for (i=0; i < md->ctr.ctr1.count; i++) {
1242                 md->ctr.ctr1.array[i].local_usn = seq_num;
1243         }
1244         ndr_err = ndr_push_struct_blob(&md_value, msg, 
1245                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1246                                        md,
1247                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1248         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1249                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1250                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1251         }
1252         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
1253         if (ret != LDB_SUCCESS) {
1254                 return replmd_replicated_request_error(ar, ret);
1255         }
1256
1257         replmd_ldb_message_sort(msg, ar->schema);
1258
1259         if (DEBUGLVL(4)) {
1260                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
1261                 DEBUG(4, ("DRS replication add message:\n%s\n", s));
1262                 talloc_free(s);
1263         }
1264
1265         ret = ldb_build_add_req(&change_req,
1266                                 ldb,
1267                                 ar,
1268                                 msg,
1269                                 ar->controls,
1270                                 ar,
1271                                 replmd_replicated_apply_add_callback,
1272                                 ar->req);
1273         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1274
1275         return ldb_next_request(ar->module, change_req);
1276 }
1277
1278 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
1279                                                          struct replPropertyMetaData1 *m2)
1280 {
1281         int ret;
1282
1283         if (m1->version != m2->version) {
1284                 return m1->version - m2->version;
1285         }
1286
1287         if (m1->originating_change_time != m2->originating_change_time) {
1288                 return m1->originating_change_time - m2->originating_change_time;
1289         }
1290
1291         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
1292         if (ret != 0) {
1293                 return ret;
1294         }
1295
1296         return m1->originating_usn - m2->originating_usn;
1297 }
1298
1299 static int replmd_replicated_apply_merge_callback(struct ldb_request *req,
1300                                                   struct ldb_reply *ares)
1301 {
1302         struct ldb_context *ldb;
1303         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1304                                                struct replmd_replicated_request);
1305         int ret;
1306
1307         ldb = ldb_module_get_ctx(ar->module);
1308
1309         if (!ares) {
1310                 return ldb_module_done(ar->req, NULL, NULL,
1311                                         LDB_ERR_OPERATIONS_ERROR);
1312         }
1313         if (ares->error != LDB_SUCCESS) {
1314                 return ldb_module_done(ar->req, ares->controls,
1315                                         ares->response, ares->error);
1316         }
1317
1318         if (ares->type != LDB_REPLY_DONE) {
1319                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1320                 return ldb_module_done(ar->req, NULL, NULL,
1321                                         LDB_ERR_OPERATIONS_ERROR);
1322         }
1323
1324         talloc_free(ares);
1325         ar->index_current++;
1326
1327         ret = replmd_replicated_apply_next(ar);
1328         if (ret != LDB_SUCCESS) {
1329                 return ldb_module_done(ar->req, NULL, NULL, ret);
1330         }
1331
1332         return LDB_SUCCESS;
1333 }
1334
1335 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
1336 {
1337         struct ldb_context *ldb;
1338         struct ldb_request *change_req;
1339         enum ndr_err_code ndr_err;
1340         struct ldb_message *msg;
1341         struct replPropertyMetaDataBlob *rmd;
1342         struct replPropertyMetaDataBlob omd;
1343         const struct ldb_val *omd_value;
1344         struct replPropertyMetaDataBlob nmd;
1345         struct ldb_val nmd_value;
1346         uint32_t i,j,ni=0;
1347         uint32_t removed_attrs = 0;
1348         uint64_t seq_num;
1349         int ret;
1350
1351         ldb = ldb_module_get_ctx(ar->module);
1352         msg = ar->objs->objects[ar->index_current].msg;
1353         rmd = ar->objs->objects[ar->index_current].meta_data;
1354         ZERO_STRUCT(omd);
1355         omd.version = 1;
1356
1357         /*
1358          * TODO: check repl data is correct after a rename
1359          */
1360         if (ldb_dn_compare(msg->dn, ar->search_msg->dn) != 0) {
1361                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_request rename %s => %s\n",
1362                           ldb_dn_get_linearized(ar->search_msg->dn),
1363                           ldb_dn_get_linearized(msg->dn));
1364                 if (ldb_rename(ldb, ar->search_msg->dn, msg->dn) != LDB_SUCCESS) {
1365                         ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_replicated_request rename %s => %s failed - %s\n",
1366                                   ldb_dn_get_linearized(ar->search_msg->dn),
1367                                   ldb_dn_get_linearized(msg->dn),
1368                                   ldb_errstring(ldb));
1369                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
1370                 }
1371         }
1372
1373         /* find existing meta data */
1374         omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
1375         if (omd_value) {
1376                 ndr_err = ndr_pull_struct_blob(omd_value, ar,
1377                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
1378                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
1379                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1380                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1381                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1382                 }
1383
1384                 if (omd.version != 1) {
1385                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1386                 }
1387         }
1388
1389         ZERO_STRUCT(nmd);
1390         nmd.version = 1;
1391         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
1392         nmd.ctr.ctr1.array = talloc_array(ar,
1393                                           struct replPropertyMetaData1,
1394                                           nmd.ctr.ctr1.count);
1395         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1396
1397         /* first copy the old meta data */
1398         for (i=0; i < omd.ctr.ctr1.count; i++) {
1399                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
1400                 ni++;
1401         }
1402
1403         /* now merge in the new meta data */
1404         for (i=0; i < rmd->ctr.ctr1.count; i++) {
1405                 bool found = false;
1406
1407                 for (j=0; j < ni; j++) {
1408                         int cmp;
1409
1410                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
1411                                 continue;
1412                         }
1413
1414                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
1415                                                                             &nmd.ctr.ctr1.array[j]);
1416                         if (cmp > 0) {
1417                                 /* replace the entry */
1418                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
1419                                 found = true;
1420                                 break;
1421                         }
1422
1423                         /* we don't want to apply this change so remove the attribute */
1424                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
1425                         removed_attrs++;
1426
1427                         found = true;
1428                         break;
1429                 }
1430
1431                 if (found) continue;
1432
1433                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
1434                 ni++;
1435         }
1436
1437         /*
1438          * finally correct the size of the meta_data array
1439          */
1440         nmd.ctr.ctr1.count = ni;
1441
1442         /*
1443          * the rdn attribute (the alias for the name attribute),
1444          * 'cn' for most objects is the last entry in the meta data array
1445          * we have stored
1446          *
1447          * sort the new meta data array
1448          */
1449         ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, ar->schema, msg->dn);
1450         if (ret != LDB_SUCCESS) {
1451                 return ret;
1452         }
1453
1454         /*
1455          * check if some replicated attributes left, otherwise skip the ldb_modify() call
1456          */
1457         if (msg->num_elements == 0) {
1458                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
1459                           ar->index_current);
1460
1461                 ar->index_current++;
1462                 return replmd_replicated_apply_next(ar);
1463         }
1464
1465         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
1466                   ar->index_current, msg->num_elements);
1467
1468         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1469         if (ret != LDB_SUCCESS) {
1470                 return replmd_replicated_request_error(ar, ret);
1471         }
1472
1473         for (i=0; i<ni; i++) {
1474                 nmd.ctr.ctr1.array[i].local_usn = seq_num;
1475         }
1476
1477         /* create the meta data value */
1478         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
1479                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1480                                        &nmd,
1481                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1482         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1483                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1484                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1485         }
1486
1487         /*
1488          * when we know that we'll modify the record, add the whenChanged, uSNChanged
1489          * and replPopertyMetaData attributes
1490          */
1491         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1492         if (ret != LDB_SUCCESS) {
1493                 return replmd_replicated_request_error(ar, ret);
1494         }
1495         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1496         if (ret != LDB_SUCCESS) {
1497                 return replmd_replicated_request_error(ar, ret);
1498         }
1499         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
1500         if (ret != LDB_SUCCESS) {
1501                 return replmd_replicated_request_error(ar, ret);
1502         }
1503
1504         replmd_ldb_message_sort(msg, ar->schema);
1505
1506         /* we want to replace the old values */
1507         for (i=0; i < msg->num_elements; i++) {
1508                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
1509         }
1510
1511         ret = replmd_notify(ar->module, msg->dn, seq_num);
1512         if (ret != LDB_SUCCESS) {
1513                 return replmd_replicated_request_error(ar, ret);
1514         }
1515
1516         if (DEBUGLVL(4)) {
1517                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1518                 DEBUG(4, ("DRS replication modify message:\n%s\n", s));
1519                 talloc_free(s);
1520         }
1521
1522         ret = ldb_build_mod_req(&change_req,
1523                                 ldb,
1524                                 ar,
1525                                 msg,
1526                                 ar->controls,
1527                                 ar,
1528                                 replmd_replicated_apply_merge_callback,
1529                                 ar->req);
1530         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1531
1532         return ldb_next_request(ar->module, change_req);
1533 }
1534
1535 static int replmd_replicated_apply_search_callback(struct ldb_request *req,
1536                                                    struct ldb_reply *ares)
1537 {
1538         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1539                                                struct replmd_replicated_request);
1540         int ret;
1541
1542         if (!ares) {
1543                 return ldb_module_done(ar->req, NULL, NULL,
1544                                         LDB_ERR_OPERATIONS_ERROR);
1545         }
1546         if (ares->error != LDB_SUCCESS &&
1547             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1548                 return ldb_module_done(ar->req, ares->controls,
1549                                         ares->response, ares->error);
1550         }
1551
1552         switch (ares->type) {
1553         case LDB_REPLY_ENTRY:
1554                 ar->search_msg = talloc_steal(ar, ares->message);
1555                 break;
1556
1557         case LDB_REPLY_REFERRAL:
1558                 /* we ignore referrals */
1559                 break;
1560
1561         case LDB_REPLY_DONE:
1562                 if (ar->search_msg != NULL) {
1563                         ret = replmd_replicated_apply_merge(ar);
1564                 } else {
1565                         ret = replmd_replicated_apply_add(ar);
1566                 }
1567                 if (ret != LDB_SUCCESS) {
1568                         return ldb_module_done(ar->req, NULL, NULL, ret);
1569                 }
1570         }
1571
1572         talloc_free(ares);
1573         return LDB_SUCCESS;
1574 }
1575
1576 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
1577
1578 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
1579 {
1580         struct ldb_context *ldb;
1581         int ret;
1582         char *tmp_str;
1583         char *filter;
1584         struct ldb_request *search_req;
1585
1586         if (ar->index_current >= ar->objs->num_objects) {
1587                 /* done with it, go to next stage */
1588                 return replmd_replicated_uptodate_vector(ar);
1589         }
1590
1591         ldb = ldb_module_get_ctx(ar->module);
1592         ar->search_msg = NULL;
1593
1594         tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].guid_value);
1595         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1596
1597         filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
1598         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1599         talloc_free(tmp_str);
1600
1601         ret = ldb_build_search_req(&search_req,
1602                                    ldb,
1603                                    ar,
1604                                    ar->objs->partition_dn,
1605                                    LDB_SCOPE_SUBTREE,
1606                                    filter,
1607                                    NULL,
1608                                    NULL,
1609                                    ar,
1610                                    replmd_replicated_apply_search_callback,
1611                                    ar->req);
1612
1613         ret = ldb_request_add_control(search_req, LDB_CONTROL_SHOW_DELETED_OID, true, NULL);
1614         if (ret != LDB_SUCCESS) {
1615                 return ret;
1616         }
1617         
1618
1619         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1620
1621         return ldb_next_request(ar->module, search_req);
1622 }
1623
1624 static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
1625                                                       struct ldb_reply *ares)
1626 {
1627         struct ldb_context *ldb;
1628         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1629                                                struct replmd_replicated_request);
1630         ldb = ldb_module_get_ctx(ar->module);
1631
1632         if (!ares) {
1633                 return ldb_module_done(ar->req, NULL, NULL,
1634                                         LDB_ERR_OPERATIONS_ERROR);
1635         }
1636         if (ares->error != LDB_SUCCESS) {
1637                 return ldb_module_done(ar->req, ares->controls,
1638                                         ares->response, ares->error);
1639         }
1640
1641         if (ares->type != LDB_REPLY_DONE) {
1642                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1643                 return ldb_module_done(ar->req, NULL, NULL,
1644                                         LDB_ERR_OPERATIONS_ERROR);
1645         }
1646
1647         talloc_free(ares);
1648
1649         return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
1650 }
1651
1652 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1653 {
1654         struct ldb_context *ldb;
1655         struct ldb_request *change_req;
1656         enum ndr_err_code ndr_err;
1657         struct ldb_message *msg;
1658         struct replUpToDateVectorBlob ouv;
1659         const struct ldb_val *ouv_value;
1660         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1661         struct replUpToDateVectorBlob nuv;
1662         struct ldb_val nuv_value;
1663         struct ldb_message_element *nuv_el = NULL;
1664         const struct GUID *our_invocation_id;
1665         struct ldb_message_element *orf_el = NULL;
1666         struct repsFromToBlob nrf;
1667         struct ldb_val *nrf_value = NULL;
1668         struct ldb_message_element *nrf_el = NULL;
1669         uint32_t i,j,ni=0;
1670         bool found = false;
1671         time_t t = time(NULL);
1672         NTTIME now;
1673         int ret;
1674
1675         ldb = ldb_module_get_ctx(ar->module);
1676         ruv = ar->objs->uptodateness_vector;
1677         ZERO_STRUCT(ouv);
1678         ouv.version = 2;
1679         ZERO_STRUCT(nuv);
1680         nuv.version = 2;
1681
1682         unix_to_nt_time(&now, t);
1683
1684         /*
1685          * first create the new replUpToDateVector
1686          */
1687         ouv_value = ldb_msg_find_ldb_val(ar->search_msg, "replUpToDateVector");
1688         if (ouv_value) {
1689                 ndr_err = ndr_pull_struct_blob(ouv_value, ar,
1690                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &ouv,
1691                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1692                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1693                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1694                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1695                 }
1696
1697                 if (ouv.version != 2) {
1698                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1699                 }
1700         }
1701
1702         /*
1703          * the new uptodateness vector will at least
1704          * contain 1 entry, one for the source_dsa
1705          *
1706          * plus optional values from our old vector and the one from the source_dsa
1707          */
1708         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
1709         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1710         nuv.ctr.ctr2.cursors = talloc_array(ar,
1711                                             struct drsuapi_DsReplicaCursor2,
1712                                             nuv.ctr.ctr2.count);
1713         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1714
1715         /* first copy the old vector */
1716         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1717                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1718                 ni++;
1719         }
1720
1721         /* get our invocation_id if we have one already attached to the ldb */
1722         our_invocation_id = samdb_ntds_invocation_id(ldb);
1723
1724         /* merge in the source_dsa vector is available */
1725         for (i=0; (ruv && i < ruv->count); i++) {
1726                 found = false;
1727
1728                 if (our_invocation_id &&
1729                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1730                                our_invocation_id)) {
1731                         continue;
1732                 }
1733
1734                 for (j=0; j < ni; j++) {
1735                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1736                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1737                                 continue;
1738                         }
1739
1740                         found = true;
1741
1742                         /*
1743                          * we update only the highest_usn and not the latest_sync_success time,
1744                          * because the last success stands for direct replication
1745                          */
1746                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1747                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1748                         }
1749                         break;                  
1750                 }
1751
1752                 if (found) continue;
1753
1754                 /* if it's not there yet, add it */
1755                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1756                 ni++;
1757         }
1758
1759         /*
1760          * merge in the current highwatermark for the source_dsa
1761          */
1762         found = false;
1763         for (j=0; j < ni; j++) {
1764                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1765                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1766                         continue;
1767                 }
1768
1769                 found = true;
1770
1771                 /*
1772                  * here we update the highest_usn and last_sync_success time
1773                  * because we're directly replicating from the source_dsa
1774                  *
1775                  * and use the tmp_highest_usn because this is what we have just applied
1776                  * to our ldb
1777                  */
1778                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1779                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1780                 break;
1781         }
1782         if (!found) {
1783                 /*
1784                  * here we update the highest_usn and last_sync_success time
1785                  * because we're directly replicating from the source_dsa
1786                  *
1787                  * and use the tmp_highest_usn because this is what we have just applied
1788                  * to our ldb
1789                  */
1790                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1791                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1792                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1793                 ni++;
1794         }
1795
1796         /*
1797          * finally correct the size of the cursors array
1798          */
1799         nuv.ctr.ctr2.count = ni;
1800
1801         /*
1802          * sort the cursors
1803          */
1804         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1805               sizeof(struct drsuapi_DsReplicaCursor2),
1806               (comparison_fn_t)drsuapi_DsReplicaCursor2_compare);
1807
1808         /*
1809          * create the change ldb_message
1810          */
1811         msg = ldb_msg_new(ar);
1812         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1813         msg->dn = ar->search_msg->dn;
1814
1815         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
1816                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1817                                        &nuv,
1818                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1819         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1820                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1821                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1822         }
1823         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1824         if (ret != LDB_SUCCESS) {
1825                 return replmd_replicated_request_error(ar, ret);
1826         }
1827         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1828
1829         /*
1830          * now create the new repsFrom value from the given repsFromTo1 structure
1831          */
1832         ZERO_STRUCT(nrf);
1833         nrf.version                                     = 1;
1834         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1835         /* and fix some values... */
1836         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1837         nrf.ctr.ctr1.last_success                       = now;
1838         nrf.ctr.ctr1.last_attempt                       = now;
1839         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1840         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1841
1842         /*
1843          * first see if we already have a repsFrom value for the current source dsa
1844          * if so we'll later replace this value
1845          */
1846         orf_el = ldb_msg_find_element(ar->search_msg, "repsFrom");
1847         if (orf_el) {
1848                 for (i=0; i < orf_el->num_values; i++) {
1849                         struct repsFromToBlob *trf;
1850
1851                         trf = talloc(ar, struct repsFromToBlob);
1852                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1853
1854                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), trf,
1855                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1856                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1857                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1858                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1859                         }
1860
1861                         if (trf->version != 1) {
1862                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1863                         }
1864
1865                         /*
1866                          * we compare the source dsa objectGUID not the invocation_id
1867                          * because we want only one repsFrom value per source dsa
1868                          * and when the invocation_id of the source dsa has changed we don't need 
1869                          * the old repsFrom with the old invocation_id
1870                          */
1871                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1872                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1873                                 talloc_free(trf);
1874                                 continue;
1875                         }
1876
1877                         talloc_free(trf);
1878                         nrf_value = &orf_el->values[i];
1879                         break;
1880                 }
1881
1882                 /*
1883                  * copy over all old values to the new ldb_message
1884                  */
1885                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1886                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1887                 *nrf_el = *orf_el;
1888         }
1889
1890         /*
1891          * if we haven't found an old repsFrom value for the current source dsa
1892          * we'll add a new value
1893          */
1894         if (!nrf_value) {
1895                 struct ldb_val zero_value;
1896                 ZERO_STRUCT(zero_value);
1897                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1898                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1899
1900                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1901         }
1902
1903         /* we now fill the value which is already attached to ldb_message */
1904         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
1905                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1906                                        &nrf,
1907                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1908         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1909                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1910                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1911         }
1912
1913         /* 
1914          * the ldb_message_element for the attribute, has all the old values and the new one
1915          * so we'll replace the whole attribute with all values
1916          */
1917         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1918
1919         if (DEBUGLVL(4)) {
1920                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1921                 DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
1922                 talloc_free(s);
1923         }
1924
1925         /* prepare the ldb_modify() request */
1926         ret = ldb_build_mod_req(&change_req,
1927                                 ldb,
1928                                 ar,
1929                                 msg,
1930                                 ar->controls,
1931                                 ar,
1932                                 replmd_replicated_uptodate_modify_callback,
1933                                 ar->req);
1934         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1935
1936         return ldb_next_request(ar->module, change_req);
1937 }
1938
1939 static int replmd_replicated_uptodate_search_callback(struct ldb_request *req,
1940                                                       struct ldb_reply *ares)
1941 {
1942         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1943                                                struct replmd_replicated_request);
1944         int ret;
1945
1946         if (!ares) {
1947                 return ldb_module_done(ar->req, NULL, NULL,
1948                                         LDB_ERR_OPERATIONS_ERROR);
1949         }
1950         if (ares->error != LDB_SUCCESS &&
1951             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1952                 return ldb_module_done(ar->req, ares->controls,
1953                                         ares->response, ares->error);
1954         }
1955
1956         switch (ares->type) {
1957         case LDB_REPLY_ENTRY:
1958                 ar->search_msg = talloc_steal(ar, ares->message);
1959                 break;
1960
1961         case LDB_REPLY_REFERRAL:
1962                 /* we ignore referrals */
1963                 break;
1964
1965         case LDB_REPLY_DONE:
1966                 if (ar->search_msg == NULL) {
1967                         ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1968                 } else {
1969                         ret = replmd_replicated_uptodate_modify(ar);
1970                 }
1971                 if (ret != LDB_SUCCESS) {
1972                         return ldb_module_done(ar->req, NULL, NULL, ret);
1973                 }
1974         }
1975
1976         talloc_free(ares);
1977         return LDB_SUCCESS;
1978 }
1979
1980
1981 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1982 {
1983         struct ldb_context *ldb;
1984         int ret;
1985         static const char *attrs[] = {
1986                 "replUpToDateVector",
1987                 "repsFrom",
1988                 NULL
1989         };
1990         struct ldb_request *search_req;
1991
1992         ldb = ldb_module_get_ctx(ar->module);
1993         ar->search_msg = NULL;
1994
1995         ret = ldb_build_search_req(&search_req,
1996                                    ldb,
1997                                    ar,
1998                                    ar->objs->partition_dn,
1999                                    LDB_SCOPE_BASE,
2000                                    "(objectClass=*)",
2001                                    attrs,
2002                                    NULL,
2003                                    ar,
2004                                    replmd_replicated_uptodate_search_callback,
2005                                    ar->req);
2006         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
2007
2008         return ldb_next_request(ar->module, search_req);
2009 }
2010
2011
2012
2013 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
2014 {
2015         struct ldb_context *ldb;
2016         struct dsdb_extended_replicated_objects *objs;
2017         struct replmd_replicated_request *ar;
2018         struct ldb_control **ctrls;
2019         int ret, i;
2020         struct dsdb_control_current_partition *partition_ctrl;
2021         struct replmd_private *replmd_private = 
2022                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2023
2024         ldb = ldb_module_get_ctx(module);
2025
2026         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
2027
2028         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
2029         if (!objs) {
2030                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
2031                 return LDB_ERR_PROTOCOL_ERROR;
2032         }
2033
2034         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
2035                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
2036                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
2037                 return LDB_ERR_PROTOCOL_ERROR;
2038         }
2039
2040         ar = replmd_ctx_init(module, req);
2041         if (!ar)
2042                 return LDB_ERR_OPERATIONS_ERROR;
2043
2044         ar->objs = objs;
2045         ar->schema = dsdb_get_schema(ldb);
2046         if (!ar->schema) {
2047                 ldb_debug_set(ldb, LDB_DEBUG_FATAL, "replmd_ctx_init: no loaded schema found\n");
2048                 talloc_free(ar);
2049                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
2050                 return LDB_ERR_CONSTRAINT_VIOLATION;
2051         }
2052
2053         ctrls = req->controls;
2054
2055         if (req->controls) {
2056                 req->controls = talloc_memdup(ar, req->controls,
2057                                               talloc_get_size(req->controls));
2058                 if (!req->controls) return replmd_replicated_request_werror(ar, WERR_NOMEM);
2059         }
2060
2061         ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, NULL);
2062         if (ret != LDB_SUCCESS) {
2063                 return ret;
2064         }
2065
2066         /*
2067           add the DSDB_CONTROL_CURRENT_PARTITION_OID control. This
2068           tells the partition module which partition this request is
2069           directed at. That is important as the partition roots appear
2070           twice in the directory, once as mount points in the top
2071           level store, and once as the roots of each partition. The
2072           replication code wants to operate on the root of the
2073           partitions, not the top level mount points
2074          */
2075         partition_ctrl = talloc(req, struct dsdb_control_current_partition);
2076         if (partition_ctrl == NULL) {
2077                 if (!partition_ctrl) return replmd_replicated_request_werror(ar, WERR_NOMEM);
2078         }
2079         partition_ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
2080         partition_ctrl->dn = objs->partition_dn;
2081
2082         ret = ldb_request_add_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID, false, partition_ctrl);
2083         if (ret != LDB_SUCCESS) {
2084                 return ret;
2085         }
2086
2087         ar->controls = req->controls;
2088         req->controls = ctrls;
2089
2090         DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
2091
2092         /* save away the linked attributes for the end of the
2093            transaction */
2094         for (i=0; i<ar->objs->linked_attributes_count; i++) {
2095                 struct la_entry *la_entry;
2096
2097                 if (replmd_private->la_ctx == NULL) {
2098                         replmd_private->la_ctx = talloc_new(replmd_private);
2099                 }
2100                 la_entry = talloc(replmd_private->la_ctx, struct la_entry);
2101                 if (la_entry == NULL) {
2102                         ldb_oom(ldb);
2103                         return LDB_ERR_OPERATIONS_ERROR;
2104                 }
2105                 la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
2106                 if (la_entry->la == NULL) {
2107                         talloc_free(la_entry);
2108                         ldb_oom(ldb);
2109                         return LDB_ERR_OPERATIONS_ERROR;
2110                 }
2111                 *la_entry->la = ar->objs->linked_attributes[i];
2112
2113                 /* we need to steal the non-scalars so they stay
2114                    around until the end of the transaction */
2115                 talloc_steal(la_entry->la, la_entry->la->identifier);
2116                 talloc_steal(la_entry->la, la_entry->la->value.blob);
2117
2118                 DLIST_ADD(replmd_private->la_list, la_entry);
2119         }
2120
2121         return replmd_replicated_apply_next(ar);
2122 }
2123
2124 /*
2125   process one linked attribute structure
2126  */
2127 static int replmd_process_linked_attribute(struct ldb_module *module,
2128                                            struct la_entry *la_entry)
2129 {                                          
2130         struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
2131         struct ldb_context *ldb = ldb_module_get_ctx(module);
2132         struct drsuapi_DsReplicaObjectIdentifier3 target;
2133         struct ldb_message *msg;
2134         struct ldb_message_element *ret_el;
2135         TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
2136         enum ndr_err_code ndr_err;
2137         struct ldb_request *mod_req;
2138         int ret;
2139         const struct dsdb_attribute *attr;
2140         struct ldb_dn *target_dn;
2141         uint64_t seq_num = 0;
2142
2143 /*
2144 linked_attributes[0]:                                                     
2145      &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute 
2146         identifier               : *                                      
2147             identifier: struct drsuapi_DsReplicaObjectIdentifier          
2148                 __ndr_size               : 0x0000003a (58)                
2149                 __ndr_size_sid           : 0x00000000 (0)                 
2150                 guid                     : 8e95b6a9-13dd-4158-89db-3220a5be5cc7
2151                 sid                      : S-0-0                               
2152                 __ndr_size_dn            : 0x00000000 (0)                      
2153                 dn                       : ''                                  
2154         attid                    : DRSUAPI_ATTRIBUTE_member (0x1F)             
2155         value: struct drsuapi_DsAttributeValue                                 
2156             __ndr_size               : 0x0000007e (126)                        
2157             blob                     : *                                       
2158                 blob                     : DATA_BLOB length=126                
2159         flags                    : 0x00000001 (1)                              
2160                1: DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE                      
2161         originating_add_time     : Wed Sep  2 22:20:01 2009 EST                
2162         meta_data: struct drsuapi_DsReplicaMetaData                            
2163             version                  : 0x00000015 (21)                         
2164             originating_change_time  : Wed Sep  2 23:39:07 2009 EST            
2165             originating_invocation_id: 794640f3-18cf-40ee-a211-a93992b67a64    
2166             originating_usn          : 0x000000000001e19c (123292)             
2167      &target: struct drsuapi_DsReplicaObjectIdentifier3                        
2168         __ndr_size               : 0x0000007e (126)                            
2169         __ndr_size_sid           : 0x0000001c (28)                             
2170         guid                     : 7639e594-db75-4086-b0d4-67890ae46031        
2171         sid                      : S-1-5-21-2848215498-2472035911-1947525656-19924
2172         __ndr_size_dn            : 0x00000022 (34)                                
2173         dn                       : 'CN=UOne,OU=TestOU,DC=vsofs8,DC=com'           
2174  */
2175         if (DEBUGLVL(4)) {
2176                 NDR_PRINT_DEBUG(drsuapi_DsReplicaLinkedAttribute, la); 
2177         }
2178         
2179         /* decode the target of the link */
2180         ndr_err = ndr_pull_struct_blob(la->value.blob, 
2181                                        tmp_ctx, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
2182                                        &target,
2183                                        (ndr_pull_flags_fn_t)ndr_pull_drsuapi_DsReplicaObjectIdentifier3);
2184         if (ndr_err != NDR_ERR_SUCCESS) {
2185                 DEBUG(0,("Unable to decode linked_attribute target\n"));
2186                 dump_data(4, la->value.blob->data, la->value.blob->length);                     
2187                 talloc_free(tmp_ctx);
2188                 return LDB_ERR_OPERATIONS_ERROR;
2189         }
2190         if (DEBUGLVL(4)) {
2191                 NDR_PRINT_DEBUG(drsuapi_DsReplicaObjectIdentifier3, &target);
2192         }
2193
2194         /* construct a modify request for this attribute change */
2195         msg = ldb_msg_new(tmp_ctx);
2196         if (!msg) {
2197                 ldb_oom(ldb);
2198                 talloc_free(tmp_ctx);
2199                 return LDB_ERR_OPERATIONS_ERROR;
2200         }
2201
2202         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, 
2203                                    GUID_string(tmp_ctx, &la->identifier->guid), &msg->dn);
2204         if (ret != LDB_SUCCESS) {
2205                 talloc_free(tmp_ctx);
2206                 return ret;
2207         }
2208
2209         /* find the attribute being modified */
2210         attr = dsdb_attribute_by_attributeID_id(dsdb_get_schema(ldb), la->attid);
2211         if (attr == NULL) {
2212                 DEBUG(0, (__location__ ": Unable to find attributeID 0x%x\n", la->attid));
2213                 talloc_free(tmp_ctx);
2214                 return LDB_ERR_OPERATIONS_ERROR;
2215         }
2216
2217         if (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) {
2218                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
2219                                         LDB_FLAG_MOD_ADD, &ret_el);
2220         } else {
2221                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
2222                                         LDB_FLAG_MOD_DELETE, &ret_el);
2223         }
2224         if (ret != LDB_SUCCESS) {
2225                 talloc_free(tmp_ctx);
2226                 return ret;
2227         }
2228         /* we allocate two entries here, in case we need a remove/add
2229            pair */
2230         ret_el->values = talloc_array(msg, struct ldb_val, 2);
2231         if (!ret_el->values) {
2232                 ldb_oom(ldb);
2233                 talloc_free(tmp_ctx);
2234                 return LDB_ERR_OPERATIONS_ERROR;
2235         }
2236         ret_el->num_values = 1;
2237
2238         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, GUID_string(tmp_ctx, &target.guid), &target_dn);
2239         if (ret != LDB_SUCCESS) {
2240                 DEBUG(0,(__location__ ": Failed to map GUID %s to DN\n", GUID_string(tmp_ctx, &target.guid)));
2241                 talloc_free(tmp_ctx);
2242                 return LDB_ERR_OPERATIONS_ERROR;
2243         }
2244
2245         ret_el->values[0].data = (uint8_t *)ldb_dn_get_extended_linearized(tmp_ctx, target_dn, 1);
2246         ret_el->values[0].length = strlen((char *)ret_el->values[0].data);
2247
2248         ret = replmd_update_rpmd(module, msg, &seq_num);
2249         if (ret != LDB_SUCCESS) {
2250                 talloc_free(tmp_ctx);
2251                 return ret;
2252         }
2253
2254         /* we only change whenChanged and uSNChanged if the seq_num
2255            has changed */
2256         if (seq_num != 0) {
2257                 time_t t = time(NULL);
2258
2259                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
2260                         talloc_free(tmp_ctx);
2261                         return LDB_ERR_OPERATIONS_ERROR;
2262                 }
2263
2264                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
2265                         talloc_free(tmp_ctx);
2266                         return LDB_ERR_OPERATIONS_ERROR;
2267                 }
2268         }
2269
2270         ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2271                                 msg,
2272                                 NULL,
2273                                 NULL, 
2274                                 ldb_op_default_callback,
2275                                 NULL);
2276         if (ret != LDB_SUCCESS) {
2277                 talloc_free(tmp_ctx);
2278                 return ret;
2279         }
2280         talloc_steal(mod_req, msg);
2281
2282         if (DEBUGLVL(4)) {
2283                 DEBUG(4,("Applying DRS linked attribute change:\n%s\n",
2284                          ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg)));
2285         }
2286
2287         /* Run the new request */
2288         ret = ldb_next_request(module, mod_req);
2289
2290         /* we need to wait for this to finish, as we are being called
2291            from the synchronous end_transaction hook of this module */
2292         if (ret == LDB_SUCCESS) {
2293                 ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
2294         }
2295
2296         if (ret == LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
2297                 /* the link destination exists, we need to update it
2298                  * by deleting the old one for the same DN then adding
2299                  * the new one */
2300                 msg->elements = talloc_realloc(msg, msg->elements,
2301                                                struct ldb_message_element,
2302                                                msg->num_elements+1);
2303                 if (msg->elements == NULL) {
2304                         ldb_oom(ldb);
2305                         talloc_free(tmp_ctx);
2306                         return LDB_ERR_OPERATIONS_ERROR;
2307                 }
2308                 /* this relies on the backend matching the old entry
2309                    only by the DN portion of the extended DN */
2310                 msg->elements[1] = msg->elements[0];
2311                 msg->elements[0].flags = LDB_FLAG_MOD_DELETE;
2312                 msg->num_elements++;
2313
2314                 ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2315                                         msg,
2316                                         NULL,
2317                                         NULL, 
2318                                         ldb_op_default_callback,
2319                                         NULL);
2320                 if (ret != LDB_SUCCESS) {
2321                         talloc_free(tmp_ctx);
2322                         return ret;
2323                 }
2324
2325                 /* Run the new request */
2326                 ret = ldb_next_request(module, mod_req);
2327                 
2328                 if (ret == LDB_SUCCESS) {
2329                         ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
2330                 }
2331         }
2332
2333         if (ret != LDB_SUCCESS) {
2334                 ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s' %s\n",
2335                           ldb_errstring(ldb),
2336                           ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
2337                 ret = LDB_SUCCESS;
2338         }
2339         
2340         talloc_free(tmp_ctx);
2341
2342         return ret;     
2343 }
2344
2345 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
2346 {
2347         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
2348                 return replmd_extended_replicated_objects(module, req);
2349         }
2350
2351         return ldb_next_request(module, req);
2352 }
2353
2354
2355 /*
2356   we hook into the transaction operations to allow us to 
2357   perform the linked attribute updates at the end of the whole
2358   transaction. This allows a forward linked attribute to be created
2359   before the object is created. During a vampire, w2k8 sends us linked
2360   attributes before the objects they are part of.
2361  */
2362 static int replmd_start_transaction(struct ldb_module *module)
2363 {
2364         /* create our private structure for this transaction */
2365         int i;
2366         struct replmd_private *replmd_private = talloc_get_type(ldb_module_get_private(module),
2367                                                                 struct replmd_private);
2368         talloc_free(replmd_private->la_ctx);
2369         replmd_private->la_list = NULL;
2370         replmd_private->la_ctx = NULL;
2371
2372         for (i=0; i<replmd_private->num_ncs; i++) {
2373                 replmd_private->ncs[i].mod_usn = 0;
2374         }
2375
2376         return ldb_next_start_trans(module);
2377 }
2378
2379 /*
2380   on prepare commit we loop over our queued la_context structures and
2381   apply each of them  
2382  */
2383 static int replmd_prepare_commit(struct ldb_module *module)
2384 {
2385         struct replmd_private *replmd_private = 
2386                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2387         struct la_entry *la, *prev;
2388         int ret;
2389
2390         /* walk the list backwards, to do the first entry first, as we
2391          * added the entries with DLIST_ADD() which puts them at the
2392          * start of the list */
2393         for (la = replmd_private->la_list; la && la->next; la=la->next) ;
2394
2395         for (; la; la=prev) {
2396                 prev = la->prev;
2397                 DLIST_REMOVE(replmd_private->la_list, la);
2398                 ret = replmd_process_linked_attribute(module, la);
2399                 if (ret != LDB_SUCCESS) {
2400                         talloc_free(replmd_private->la_ctx);
2401                         replmd_private->la_list = NULL;
2402                         replmd_private->la_ctx = NULL;
2403                         return ret;
2404                 }
2405         }
2406
2407         talloc_free(replmd_private->la_ctx);
2408         replmd_private->la_list = NULL;
2409         replmd_private->la_ctx = NULL;
2410
2411         /* possibly change @REPLCHANGED */
2412         ret = replmd_notify_store(module);
2413         if (ret != LDB_SUCCESS) {
2414                 return ret;
2415         }
2416         
2417         return ldb_next_prepare_commit(module);
2418 }
2419
2420 static int replmd_del_transaction(struct ldb_module *module)
2421 {
2422         struct replmd_private *replmd_private = 
2423                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2424         talloc_free(replmd_private->la_ctx);
2425         replmd_private->la_list = NULL;
2426         replmd_private->la_ctx = NULL;
2427         return ldb_next_del_trans(module);
2428 }
2429
2430
2431 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
2432         .name          = "repl_meta_data",
2433         .init_context      = replmd_init,
2434         .add               = replmd_add,
2435         .modify            = replmd_modify,
2436         .rename            = replmd_rename,
2437         .extended          = replmd_extended,
2438         .start_transaction = replmd_start_transaction,
2439         .prepare_commit    = replmd_prepare_commit,
2440         .del_transaction   = replmd_del_transaction,
2441 };