489985a22f3218f0e121ff6831832ec0cf791d46
[sfrench/samba-autobuild/.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /*
24  *  Name: ldb
25  *
26  *  Component: ldb repl_meta_data module
27  *
28  *  Description: - add a unique objectGUID onto every new record,
29  *               - handle whenCreated, whenChanged timestamps
30  *               - handle uSNCreated, uSNChanged numbers
31  *               - handle replPropertyMetaData attribute
32  *
33  *  Author: Simo Sorce
34  *  Author: Stefan Metzmacher
35  */
36
37 #include "includes.h"
38 #include "ldb_module.h"
39 #include "dsdb/samdb/samdb.h"
40 #include "dsdb/common/proto.h"
41 #include "../libds/common/flags.h"
42 #include "librpc/gen_ndr/ndr_misc.h"
43 #include "librpc/gen_ndr/ndr_drsuapi.h"
44 #include "librpc/gen_ndr/ndr_drsblobs.h"
45 #include "param/param.h"
46 #include "libcli/security/dom_sid.h"
47 #include "lib/util/dlinklist.h"
48
49 struct replmd_private {
50         TALLOC_CTX *la_ctx;
51         struct la_entry *la_list;
52         uint32_t num_ncs;
53         struct nc_entry {
54                 struct ldb_dn *dn;
55                 struct GUID guid;
56                 uint64_t mod_usn;
57         } *ncs;
58 };
59
60 struct la_entry {
61         struct la_entry *next, *prev;
62         struct drsuapi_DsReplicaLinkedAttribute *la;
63 };
64
65 struct replmd_replicated_request {
66         struct ldb_module *module;
67         struct ldb_request *req;
68
69         const struct dsdb_schema *schema;
70
71         struct dsdb_extended_replicated_objects *objs;
72
73         /* the controls we pass down */
74         struct ldb_control **controls;
75
76         uint32_t index_current;
77
78         struct ldb_message *search_msg;
79 };
80
81
82 /*
83   initialise the module
84   allocate the private structure and build the list
85   of partition DNs for use by replmd_notify()
86  */
87 static int replmd_init(struct ldb_module *module)
88 {
89         struct replmd_private *replmd_private;
90         struct ldb_context *ldb = ldb_module_get_ctx(module);
91
92         replmd_private = talloc_zero(module, struct replmd_private);
93         if (replmd_private == NULL) {
94                 ldb_oom(ldb);
95                 return LDB_ERR_OPERATIONS_ERROR;
96         }
97         ldb_module_set_private(module, replmd_private);
98
99         return ldb_next_init(module);
100 }
101
102
103 static int nc_compare(struct nc_entry *n1, struct nc_entry *n2)
104 {
105         return ldb_dn_compare(n1->dn, n2->dn);
106 }
107
108 /*
109   build the list of partition DNs for use by replmd_notify()
110  */
111 static int replmd_load_NCs(struct ldb_module *module)
112 {
113         const char *attrs[] = { "namingContexts", NULL };
114         struct ldb_result *res = NULL;
115         int i, ret;
116         TALLOC_CTX *tmp_ctx;
117         struct ldb_context *ldb;
118         struct ldb_message_element *el;
119         struct replmd_private *replmd_private = 
120                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
121
122         if (replmd_private->ncs != NULL) {
123                 return LDB_SUCCESS;
124         }
125
126         ldb = ldb_module_get_ctx(module);
127         tmp_ctx = talloc_new(module);
128
129         /* load the list of naming contexts */
130         ret = ldb_search(ldb, tmp_ctx, &res, ldb_dn_new(tmp_ctx, ldb, ""), 
131                          LDB_SCOPE_BASE, attrs, NULL);
132         if (ret != LDB_SUCCESS ||
133             res->count != 1) {
134                 DEBUG(0,(__location__ ": Failed to load rootDSE\n"));
135                 return LDB_ERR_OPERATIONS_ERROR;
136         }
137
138         el = ldb_msg_find_element(res->msgs[0], "namingContexts");
139         if (el == NULL) {
140                 DEBUG(0,(__location__ ": Failed to load namingContexts\n"));
141                 return LDB_ERR_OPERATIONS_ERROR;                
142         }
143
144         replmd_private->num_ncs = el->num_values;
145         replmd_private->ncs = talloc_array(replmd_private, struct nc_entry, 
146                                            replmd_private->num_ncs);
147         if (replmd_private->ncs == NULL) {
148                 ldb_oom(ldb);
149                 return LDB_ERR_OPERATIONS_ERROR;
150         }
151
152         for (i=0; i<replmd_private->num_ncs; i++) {
153                 replmd_private->ncs[i].dn = 
154                         ldb_dn_from_ldb_val(replmd_private->ncs, 
155                                             ldb, &el->values[i]);
156                 replmd_private->ncs[i].mod_usn = 0;
157         }
158
159         talloc_free(res);
160
161         /* now find the GUIDs of each of those DNs */
162         for (i=0; i<replmd_private->num_ncs; i++) {
163                 const char *attrs2[] = { "objectGUID", NULL };
164                 ret = ldb_search(ldb, tmp_ctx, &res, replmd_private->ncs[i].dn,
165                                  LDB_SCOPE_BASE, attrs2, NULL);
166                 if (ret != LDB_SUCCESS ||
167                     res->count != 1) {
168                         /* this happens when the schema is first being
169                            setup */
170                         talloc_free(replmd_private->ncs);
171                         replmd_private->ncs = NULL;
172                         replmd_private->num_ncs = 0;
173                         talloc_free(tmp_ctx);
174                         return LDB_SUCCESS;
175                 }
176                 replmd_private->ncs[i].guid = 
177                         samdb_result_guid(res->msgs[0], "objectGUID");
178                 talloc_free(res);
179         }       
180
181         /* sort the NCs into order, most to least specific */
182         qsort(replmd_private->ncs, replmd_private->num_ncs,
183               sizeof(replmd_private->ncs[0]), QSORT_CAST nc_compare);
184
185         
186         talloc_free(tmp_ctx);
187         
188         return LDB_SUCCESS;
189 }
190
191
192 /*
193  * notify the repl task that a object has changed. The notifies are
194  * gathered up in the replmd_private structure then written to the
195  * @REPLCHANGED object in each partition during the prepare_commit
196  */
197 static int replmd_notify(struct ldb_module *module, struct ldb_dn *dn, uint64_t uSN)
198 {
199         int ret, i;
200         struct replmd_private *replmd_private = 
201                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
202
203         ret = replmd_load_NCs(module);
204         if (ret != LDB_SUCCESS) {
205                 return ret;
206         }
207         if (replmd_private->num_ncs == 0) {
208                 return LDB_SUCCESS;
209         }
210
211         for (i=0; i<replmd_private->num_ncs; i++) {
212                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, dn) == 0) {
213                         break;
214                 }
215         }
216         if (i == replmd_private->num_ncs) {
217                 DEBUG(0,(__location__ ": DN not within known NCs '%s'\n", 
218                          ldb_dn_get_linearized(dn)));
219                 return LDB_ERR_OPERATIONS_ERROR;
220         }
221
222         if (uSN > replmd_private->ncs[i].mod_usn) {
223             replmd_private->ncs[i].mod_usn = uSN;
224         }
225
226         return LDB_SUCCESS;
227 }
228
229
230 /*
231  * update a @REPLCHANGED record in each partition if there have been
232  * any writes of replicated data in the partition
233  */
234 static int replmd_notify_store(struct ldb_module *module)
235 {
236         int i;
237         struct replmd_private *replmd_private = 
238                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
239         struct ldb_context *ldb = ldb_module_get_ctx(module);
240
241         for (i=0; i<replmd_private->num_ncs; i++) {
242                 int ret;
243
244                 if (replmd_private->ncs[i].mod_usn == 0) {
245                         /* this partition has not changed in this
246                            transaction */
247                         continue;
248                 }
249
250                 ret = dsdb_save_partition_usn(ldb, replmd_private->ncs[i].dn, 
251                                               replmd_private->ncs[i].mod_usn);
252                 if (ret != LDB_SUCCESS) {
253                         DEBUG(0,(__location__ ": Failed to save partition uSN for %s\n",
254                                  ldb_dn_get_linearized(replmd_private->ncs[i].dn)));
255                         return ret;
256                 }
257         }
258
259         return LDB_SUCCESS;
260 }
261
262
263 /*
264   created a replmd_replicated_request context
265  */
266 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
267                                                          struct ldb_request *req)
268 {
269         struct ldb_context *ldb;
270         struct replmd_replicated_request *ac;
271
272         ldb = ldb_module_get_ctx(module);
273
274         ac = talloc_zero(req, struct replmd_replicated_request);
275         if (ac == NULL) {
276                 ldb_oom(ldb);
277                 return NULL;
278         }
279
280         ac->module = module;
281         ac->req = req;
282         return ac;
283 }
284
285 /*
286   add a time element to a record
287 */
288 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
289 {
290         struct ldb_message_element *el;
291         char *s;
292
293         if (ldb_msg_find_element(msg, attr) != NULL) {
294                 return LDB_SUCCESS;
295         }
296
297         s = ldb_timestring(msg, t);
298         if (s == NULL) {
299                 return LDB_ERR_OPERATIONS_ERROR;
300         }
301
302         if (ldb_msg_add_string(msg, attr, s) != LDB_SUCCESS) {
303                 return LDB_ERR_OPERATIONS_ERROR;
304         }
305
306         el = ldb_msg_find_element(msg, attr);
307         /* always set as replace. This works because on add ops, the flag
308            is ignored */
309         el->flags = LDB_FLAG_MOD_REPLACE;
310
311         return LDB_SUCCESS;
312 }
313
314 /*
315   add a uint64_t element to a record
316 */
317 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
318 {
319         struct ldb_message_element *el;
320
321         if (ldb_msg_find_element(msg, attr) != NULL) {
322                 return LDB_SUCCESS;
323         }
324
325         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != LDB_SUCCESS) {
326                 return LDB_ERR_OPERATIONS_ERROR;
327         }
328
329         el = ldb_msg_find_element(msg, attr);
330         /* always set as replace. This works because on add ops, the flag
331            is ignored */
332         el->flags = LDB_FLAG_MOD_REPLACE;
333
334         return LDB_SUCCESS;
335 }
336
337 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
338                                                    const struct replPropertyMetaData1 *m2,
339                                                    const uint32_t *rdn_attid)
340 {
341         if (m1->attid == m2->attid) {
342                 return 0;
343         }
344
345         /*
346          * the rdn attribute should be at the end!
347          * so we need to return a value greater than zero
348          * which means m1 is greater than m2
349          */
350         if (m1->attid == *rdn_attid) {
351                 return 1;
352         }
353
354         /*
355          * the rdn attribute should be at the end!
356          * so we need to return a value less than zero
357          * which means m2 is greater than m1
358          */
359         if (m2->attid == *rdn_attid) {
360                 return -1;
361         }
362
363         return m1->attid - m2->attid;
364 }
365
366 static int replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
367                                                 const struct dsdb_schema *schema,
368                                                 struct ldb_dn *dn)
369 {
370         const char *rdn_name;
371         const struct dsdb_attribute *rdn_sa;
372
373         rdn_name = ldb_dn_get_rdn_name(dn);
374         if (!rdn_name) {
375                 DEBUG(0,(__location__ ": No rDN for %s?\n", ldb_dn_get_linearized(dn)));
376                 return LDB_ERR_OPERATIONS_ERROR;
377         }
378
379         rdn_sa = dsdb_attribute_by_lDAPDisplayName(schema, rdn_name);
380         if (rdn_sa == NULL) {
381                 DEBUG(0,(__location__ ": No sa found for rDN %s for %s\n", rdn_name, ldb_dn_get_linearized(dn)));
382                 return LDB_ERR_OPERATIONS_ERROR;                
383         }
384
385         DEBUG(6,("Sorting rpmd with attid exception %u rDN=%s DN=%s\n", 
386                  rdn_sa->attributeID_id, rdn_name, ldb_dn_get_linearized(dn)));
387
388         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
389                   discard_const_p(void, &rdn_sa->attributeID_id), 
390                   (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
391
392         return LDB_SUCCESS;
393 }
394
395 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
396                                                  const struct ldb_message_element *e2,
397                                                  const struct dsdb_schema *schema)
398 {
399         const struct dsdb_attribute *a1;
400         const struct dsdb_attribute *a2;
401
402         /* 
403          * TODO: make this faster by caching the dsdb_attribute pointer
404          *       on the ldb_messag_element
405          */
406
407         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
408         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
409
410         /*
411          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
412          *       in the schema
413          */
414         if (!a1 || !a2) {
415                 return strcasecmp(e1->name, e2->name);
416         }
417
418         return a1->attributeID_id - a2->attributeID_id;
419 }
420
421 static void replmd_ldb_message_sort(struct ldb_message *msg,
422                                     const struct dsdb_schema *schema)
423 {
424         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
425                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
426 }
427
428 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
429 {
430         struct ldb_context *ldb;
431         struct replmd_replicated_request *ac;
432
433         ac = talloc_get_type(req->context, struct replmd_replicated_request);
434         ldb = ldb_module_get_ctx(ac->module);
435
436         if (!ares) {
437                 return ldb_module_done(ac->req, NULL, NULL,
438                                         LDB_ERR_OPERATIONS_ERROR);
439         }
440         if (ares->error != LDB_SUCCESS) {
441                 return ldb_module_done(ac->req, ares->controls,
442                                         ares->response, ares->error);
443         }
444
445         if (ares->type != LDB_REPLY_DONE) {
446                 ldb_set_errstring(ldb,
447                                   "invalid ldb_reply_type in callback");
448                 talloc_free(ares);
449                 return ldb_module_done(ac->req, NULL, NULL,
450                                         LDB_ERR_OPERATIONS_ERROR);
451         }
452
453         return ldb_module_done(ac->req, ares->controls,
454                                 ares->response, LDB_SUCCESS);
455 }
456
457 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
458 {
459         struct ldb_context *ldb;
460         struct ldb_control *control;
461         struct ldb_control **saved_controls;
462         struct replmd_replicated_request *ac;
463         const struct dsdb_schema *schema;
464         enum ndr_err_code ndr_err;
465         struct ldb_request *down_req;
466         struct ldb_message *msg;
467         const DATA_BLOB *guid_blob;
468         struct GUID guid;
469         struct ldb_val guid_value;
470         struct replPropertyMetaDataBlob nmd;
471         struct ldb_val nmd_value;
472         uint64_t seq_num;
473         const struct GUID *our_invocation_id;
474         time_t t = time(NULL);
475         NTTIME now;
476         char *time_str;
477         int ret;
478         uint32_t i, ni=0;
479         int allow_add_guid=0;
480         int remove_current_guid=0;
481
482         /* check if there's a show deleted control */
483         control = ldb_request_get_control(req, LDB_CONTROL_RELAX_OID);
484         if (control) {
485                 allow_add_guid = 1;
486         }
487
488         /* do not manipulate our control entries */
489         if (ldb_dn_is_special(req->op.add.message->dn)) {
490                 return ldb_next_request(module, req);
491         }
492
493         ldb = ldb_module_get_ctx(module);
494
495         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
496
497         schema = dsdb_get_schema(ldb);
498         if (!schema) {
499                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
500                               "replmd_add: no dsdb_schema loaded");
501                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
502                 return LDB_ERR_CONSTRAINT_VIOLATION;
503         }
504
505         ac = replmd_ctx_init(module, req);
506         if (!ac) {
507                 return LDB_ERR_OPERATIONS_ERROR;
508         }
509
510         ac->schema = schema;
511
512         guid_blob = ldb_msg_find_ldb_val(req->op.add.message, "objectGUID");
513         if ( guid_blob != NULL ) {
514                 if( !allow_add_guid ) {
515                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
516                               "replmd_add: it's not allowed to add an object with objectGUID\n");
517                         talloc_free(ac);
518                         return LDB_ERR_UNWILLING_TO_PERFORM;
519                 } else {
520                         NTSTATUS status = GUID_from_data_blob(guid_blob,&guid);
521                         if ( !NT_STATUS_IS_OK(status)) {
522                                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
523                                       "replmd_add: Unable to parse as a GUID the attribute objectGUID\n");
524                                 talloc_free(ac);
525                                 return LDB_ERR_UNWILLING_TO_PERFORM;
526                         }
527                         /* we remove this attribute as it can be a string and will not be treated 
528                         correctly and then we will readd it latter on in the good format*/
529                         remove_current_guid = 1;
530                 }
531         } else {
532                 /* a new GUID */
533                 guid = GUID_random();
534         }
535
536         /* Get a sequence number from the backend */
537         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
538         if (ret != LDB_SUCCESS) {
539                 talloc_free(ac);
540                 return ret;
541         }
542
543         /* get our invocationId */
544         our_invocation_id = samdb_ntds_invocation_id(ldb);
545         if (!our_invocation_id) {
546                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
547                               "replmd_add: unable to find invocationId\n");
548                 talloc_free(ac);
549                 return LDB_ERR_OPERATIONS_ERROR;
550         }
551
552         /* we have to copy the message as the caller might have it as a const */
553         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
554         if (msg == NULL) {
555                 ldb_oom(ldb);
556                 talloc_free(ac);
557                 return LDB_ERR_OPERATIONS_ERROR;
558         }
559
560         /* generated times */
561         unix_to_nt_time(&now, t);
562         time_str = ldb_timestring(msg, t);
563         if (!time_str) {
564                 ldb_oom(ldb);
565                 talloc_free(ac);
566                 return LDB_ERR_OPERATIONS_ERROR;
567         }
568         if (remove_current_guid) {
569                 ldb_msg_remove_attr(msg,"objectGUID");
570         }
571
572         /* 
573          * remove autogenerated attributes
574          */
575         ldb_msg_remove_attr(msg, "whenCreated");
576         ldb_msg_remove_attr(msg, "whenChanged");
577         ldb_msg_remove_attr(msg, "uSNCreated");
578         ldb_msg_remove_attr(msg, "uSNChanged");
579         ldb_msg_remove_attr(msg, "replPropertyMetaData");
580
581         if (!ldb_msg_find_element(req->op.add.message, "instanceType")) {
582                 ret = ldb_msg_add_fmt(msg, "instanceType", "%u", INSTANCE_TYPE_WRITE);
583                 if (ret != LDB_SUCCESS) {
584                         ldb_oom(ldb);
585                         talloc_free(ac);
586                         return ret;
587                 }
588         }
589
590         /*
591          * readd replicated attributes
592          */
593         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
594         if (ret != LDB_SUCCESS) {
595                 ldb_oom(ldb);
596                 talloc_free(ac);
597                 return ret;
598         }
599
600         /* build the replication meta_data */
601         ZERO_STRUCT(nmd);
602         nmd.version             = 1;
603         nmd.ctr.ctr1.count      = msg->num_elements;
604         nmd.ctr.ctr1.array      = talloc_array(msg,
605                                                struct replPropertyMetaData1,
606                                                nmd.ctr.ctr1.count);
607         if (!nmd.ctr.ctr1.array) {
608                 ldb_oom(ldb);
609                 talloc_free(ac);
610                 return LDB_ERR_OPERATIONS_ERROR;
611         }
612
613         for (i=0; i < msg->num_elements; i++) {
614                 struct ldb_message_element *e = &msg->elements[i];
615                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
616                 const struct dsdb_attribute *sa;
617
618                 if (e->name[0] == '@') continue;
619
620                 sa = dsdb_attribute_by_lDAPDisplayName(schema, e->name);
621                 if (!sa) {
622                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
623                                       "replmd_add: attribute '%s' not defined in schema\n",
624                                       e->name);
625                         talloc_free(ac);
626                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
627                 }
628
629                 if ((sa->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (sa->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
630                         /* if the attribute is not replicated (0x00000001)
631                          * or constructed (0x00000004) it has no metadata
632                          */
633                         continue;
634                 }
635
636                 m->attid                        = sa->attributeID_id;
637                 m->version                      = 1;
638                 m->originating_change_time      = now;
639                 m->originating_invocation_id    = *our_invocation_id;
640                 m->originating_usn              = seq_num;
641                 m->local_usn                    = seq_num;
642                 ni++;
643         }
644
645         /* fix meta data count */
646         nmd.ctr.ctr1.count = ni;
647
648         /*
649          * sort meta data array, and move the rdn attribute entry to the end
650          */
651         ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, schema, msg->dn);
652         if (ret != LDB_SUCCESS) {
653                 talloc_free(ac);
654                 return ret;
655         }
656
657         /* generated NDR encoded values */
658         ndr_err = ndr_push_struct_blob(&guid_value, msg, 
659                                        NULL,
660                                        &guid,
661                                        (ndr_push_flags_fn_t)ndr_push_GUID);
662         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
663                 ldb_oom(ldb);
664                 talloc_free(ac);
665                 return LDB_ERR_OPERATIONS_ERROR;
666         }
667         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
668                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
669                                        &nmd,
670                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
671         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
672                 ldb_oom(ldb);
673                 talloc_free(ac);
674                 return LDB_ERR_OPERATIONS_ERROR;
675         }
676
677         /*
678          * add the autogenerated values
679          */
680         ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
681         if (ret != LDB_SUCCESS) {
682                 ldb_oom(ldb);
683                 talloc_free(ac);
684                 return ret;
685         }
686         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
687         if (ret != LDB_SUCCESS) {
688                 ldb_oom(ldb);
689                 talloc_free(ac);
690                 return ret;
691         }
692         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
693         if (ret != LDB_SUCCESS) {
694                 ldb_oom(ldb);
695                 talloc_free(ac);
696                 return ret;
697         }
698         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
699         if (ret != LDB_SUCCESS) {
700                 ldb_oom(ldb);
701                 talloc_free(ac);
702                 return ret;
703         }
704         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
705         if (ret != LDB_SUCCESS) {
706                 ldb_oom(ldb);
707                 talloc_free(ac);
708                 return ret;
709         }
710
711         /*
712          * sort the attributes by attid before storing the object
713          */
714         replmd_ldb_message_sort(msg, schema);
715
716         ret = ldb_build_add_req(&down_req, ldb, ac,
717                                 msg,
718                                 req->controls,
719                                 ac, replmd_op_callback,
720                                 req);
721         if (ret != LDB_SUCCESS) {
722                 talloc_free(ac);
723                 return ret;
724         }
725
726         ret = replmd_notify(module, msg->dn, seq_num);
727         if (ret != LDB_SUCCESS) {
728                 talloc_free(ac);
729                 return ret;
730         }
731
732         /* if a control is there remove if from the modified request */
733         if (control && !save_controls(control, down_req, &saved_controls)) {
734                 talloc_free(ac);
735                 return LDB_ERR_OPERATIONS_ERROR;
736         }
737
738         /* go on with the call chain */
739         return ldb_next_request(module, down_req);
740 }
741
742
743 /*
744  * update the replPropertyMetaData for one element
745  */
746 static int replmd_update_rpmd_element(struct ldb_context *ldb, 
747                                       struct ldb_message *msg,
748                                       struct ldb_message_element *el,
749                                       struct replPropertyMetaDataBlob *omd,
750                                       struct dsdb_schema *schema,
751                                       uint64_t *seq_num,
752                                       const struct GUID *our_invocation_id,
753                                       NTTIME now)
754 {
755         int i;
756         const struct dsdb_attribute *a;
757         struct replPropertyMetaData1 *md1;
758
759         a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
760         if (a == NULL) {
761                 DEBUG(0,(__location__ ": Unable to find attribute %s in schema\n",
762                          el->name));
763                 return LDB_ERR_OPERATIONS_ERROR;
764         }
765
766         if ((a->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (a->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
767                 return LDB_SUCCESS;
768         }
769
770         for (i=0; i<omd->ctr.ctr1.count; i++) {
771                 if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) break;
772         }
773         if (i == omd->ctr.ctr1.count) {
774                 /* we need to add a new one */
775                 omd->ctr.ctr1.array = talloc_realloc(msg, omd->ctr.ctr1.array, 
776                                                      struct replPropertyMetaData1, omd->ctr.ctr1.count+1);
777                 if (omd->ctr.ctr1.array == NULL) {
778                         ldb_oom(ldb);
779                         return LDB_ERR_OPERATIONS_ERROR;
780                 }
781                 omd->ctr.ctr1.count++;
782                 ZERO_STRUCT(omd->ctr.ctr1.array[i]);
783         }
784
785         /* Get a new sequence number from the backend. We only do this
786          * if we have a change that requires a new
787          * replPropertyMetaData element 
788          */
789         if (*seq_num == 0) {
790                 int ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num);
791                 if (ret != LDB_SUCCESS) {
792                         return LDB_ERR_OPERATIONS_ERROR;
793                 }
794         }
795
796         md1 = &omd->ctr.ctr1.array[i];
797         md1->version++;
798         md1->attid                     = a->attributeID_id;
799         md1->originating_change_time   = now;
800         md1->originating_invocation_id = *our_invocation_id;
801         md1->originating_usn           = *seq_num;
802         md1->local_usn                 = *seq_num;
803         
804         return LDB_SUCCESS;
805 }
806
807 /*
808  * update the replPropertyMetaData object each time we modify an
809  * object. This is needed for DRS replication, as the merge on the
810  * client is based on this object 
811  */
812 static int replmd_update_rpmd(struct ldb_module *module, 
813                               struct ldb_message *msg, uint64_t *seq_num)
814 {
815         const struct ldb_val *omd_value;
816         enum ndr_err_code ndr_err;
817         struct replPropertyMetaDataBlob omd;
818         int i;
819         struct dsdb_schema *schema;
820         time_t t = time(NULL);
821         NTTIME now;
822         const struct GUID *our_invocation_id;
823         int ret;
824         const char *attrs[] = { "replPropertyMetaData" , NULL };
825         struct ldb_result *res;
826         struct ldb_context *ldb;
827
828         ldb = ldb_module_get_ctx(module);
829
830         our_invocation_id = samdb_ntds_invocation_id(ldb);
831         if (!our_invocation_id) {
832                 /* this happens during an initial vampire while
833                    updating the schema */
834                 DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
835                 return LDB_SUCCESS;
836         }
837
838         unix_to_nt_time(&now, t);
839
840         /* search for the existing replPropertyMetaDataBlob */
841         ret = dsdb_search_dn_with_deleted(ldb, msg, &res, msg->dn, attrs);
842         if (ret != LDB_SUCCESS || res->count != 1) {
843                 DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
844                          ldb_dn_get_linearized(msg->dn)));
845                 return LDB_ERR_OPERATIONS_ERROR;
846         }
847                 
848
849         omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
850         if (!omd_value) {
851                 DEBUG(0,(__location__ ": Object %s does not have a replPropertyMetaData attribute\n",
852                          ldb_dn_get_linearized(msg->dn)));
853                 return LDB_ERR_OPERATIONS_ERROR;
854         }
855
856         ndr_err = ndr_pull_struct_blob(omd_value, msg,
857                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
858                                        (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
859         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
860                 DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
861                          ldb_dn_get_linearized(msg->dn)));
862                 return LDB_ERR_OPERATIONS_ERROR;
863         }
864
865         if (omd.version != 1) {
866                 DEBUG(0,(__location__ ": bad version %u in replPropertyMetaData for %s\n",
867                          omd.version, ldb_dn_get_linearized(msg->dn)));
868                 return LDB_ERR_OPERATIONS_ERROR;
869         }
870
871         schema = dsdb_get_schema(ldb);
872
873         for (i=0; i<msg->num_elements; i++) {
874                 ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], &omd, schema, seq_num, 
875                                                  our_invocation_id, now);
876                 if (ret != LDB_SUCCESS) {
877                         return ret;
878                 }
879         }
880
881         /*
882          * replmd_update_rpmd_element has done an update if the
883          * seq_num is set
884          */
885         if (*seq_num != 0) {
886                 struct ldb_val *md_value;
887                 struct ldb_message_element *el;
888
889                 md_value = talloc(msg, struct ldb_val);
890                 if (md_value == NULL) {
891                         ldb_oom(ldb);
892                         return LDB_ERR_OPERATIONS_ERROR;
893                 }
894
895                 ret = replmd_replPropertyMetaDataCtr1_sort(&omd.ctr.ctr1, schema, msg->dn);
896                 if (ret != LDB_SUCCESS) {
897                         return ret;
898                 }
899
900                 ndr_err = ndr_push_struct_blob(md_value, msg, 
901                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
902                                                &omd,
903                                                (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
904                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
905                         DEBUG(0,(__location__ ": Failed to marshall replPropertyMetaData for %s\n",
906                                  ldb_dn_get_linearized(msg->dn)));
907                         return LDB_ERR_OPERATIONS_ERROR;
908                 }
909
910                 ret = ldb_msg_add_empty(msg, "replPropertyMetaData", LDB_FLAG_MOD_REPLACE, &el);
911                 if (ret != LDB_SUCCESS) {
912                         DEBUG(0,(__location__ ": Failed to add updated replPropertyMetaData %s\n",
913                                  ldb_dn_get_linearized(msg->dn)));
914                         return ret;
915                 }
916
917                 ret = replmd_notify(module, msg->dn, *seq_num);
918                 if (ret != LDB_SUCCESS) {
919                         return ret;
920                 }
921
922                 el->num_values = 1;
923                 el->values = md_value;
924         }
925
926         return LDB_SUCCESS;     
927 }
928
929
930 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
931 {
932         struct ldb_context *ldb;
933         struct replmd_replicated_request *ac;
934         const struct dsdb_schema *schema;
935         struct ldb_request *down_req;
936         struct ldb_message *msg;
937         struct ldb_result *res;
938         time_t t = time(NULL);
939         uint64_t seq_num = 0;
940         int ret;
941
942         /* do not manipulate our control entries */
943         if (ldb_dn_is_special(req->op.mod.message->dn)) {
944                 return ldb_next_request(module, req);
945         }
946
947         ldb = ldb_module_get_ctx(module);
948
949         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
950
951         schema = dsdb_get_schema(ldb);
952         if (!schema) {
953                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
954                               "replmd_modify: no dsdb_schema loaded");
955                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
956                 return LDB_ERR_CONSTRAINT_VIOLATION;
957         }
958
959         ac = replmd_ctx_init(module, req);
960         if (!ac) {
961                 return LDB_ERR_OPERATIONS_ERROR;
962         }
963
964         ac->schema = schema;
965
966         /* we have to copy the message as the caller might have it as a const */
967         msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
968         if (msg == NULL) {
969                 ldb_oom(ldb);
970                 talloc_free(ac);
971                 return LDB_ERR_OPERATIONS_ERROR;
972         }
973
974         /* TODO:
975          * - give an error when a readonly attribute should
976          *   be modified
977          * - merge the changed into the old object
978          *   if the caller set values to the same value
979          *   ignore the attribute, return success when no
980          *   attribute was changed
981          */
982
983         ret = dsdb_search_dn_with_deleted(ldb, msg, &res, msg->dn, NULL);
984         if (ret != LDB_SUCCESS) {
985                 talloc_free(ac);
986                 return ret;
987         }
988
989         ret = replmd_update_rpmd(module, msg, &seq_num);
990         if (ret != LDB_SUCCESS) {
991                 talloc_free(ac);
992                 return ret;
993         }
994
995         /* TODO:
996          * - replace the old object with the newly constructed one
997          */
998
999         ret = ldb_build_mod_req(&down_req, ldb, ac,
1000                                 msg,
1001                                 req->controls,
1002                                 ac, replmd_op_callback,
1003                                 req);
1004         if (ret != LDB_SUCCESS) {
1005                 talloc_free(ac);
1006                 return ret;
1007         }
1008         talloc_steal(down_req, msg);
1009
1010         /* we only change whenChanged and uSNChanged if the seq_num
1011            has changed */
1012         if (seq_num != 0) {
1013                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
1014                         talloc_free(ac);
1015                         return ret;
1016                 }
1017
1018                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
1019                         talloc_free(ac);
1020                         return ret;
1021                 }
1022         }
1023
1024         /* go on with the call chain */
1025         return ldb_next_request(module, down_req);
1026 }
1027
1028
1029 /*
1030   handle a rename request
1031
1032   On a rename we need to do an extra ldb_modify which sets the
1033   whenChanged and uSNChanged attributes
1034  */
1035 static int replmd_rename(struct ldb_module *module, struct ldb_request *req)
1036 {
1037         struct ldb_context *ldb;
1038         int ret, i;
1039         time_t t = time(NULL);
1040         uint64_t seq_num = 0;
1041         struct ldb_message *msg;
1042         struct replmd_private *replmd_private = 
1043                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1044
1045         /* do not manipulate our control entries */
1046         if (ldb_dn_is_special(req->op.mod.message->dn)) {
1047                 return ldb_next_request(module, req);
1048         }
1049
1050         ldb = ldb_module_get_ctx(module);
1051
1052         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_rename\n");
1053
1054         /* Get a sequence number from the backend */
1055         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1056         if (ret != LDB_SUCCESS) {
1057                 return ret;
1058         }
1059
1060         msg = ldb_msg_new(req);
1061         if (msg == NULL) {
1062                 ldb_oom(ldb);
1063                 return LDB_ERR_OPERATIONS_ERROR;
1064         }
1065
1066         msg->dn = req->op.rename.olddn;
1067
1068         if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
1069                 talloc_free(msg);
1070                 return LDB_ERR_OPERATIONS_ERROR;
1071         }
1072         msg->elements[0].flags = LDB_FLAG_MOD_REPLACE;
1073
1074         if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
1075                 talloc_free(msg);
1076                 return LDB_ERR_OPERATIONS_ERROR;
1077         }
1078         msg->elements[1].flags = LDB_FLAG_MOD_REPLACE;
1079
1080         ret = ldb_modify(ldb, msg);
1081         talloc_free(msg);
1082         if (ret != LDB_SUCCESS) {
1083                 return ret;
1084         }
1085
1086         ret = replmd_load_NCs(module);
1087         if (ret != 0) {
1088                 return ret;
1089         }
1090
1091         /* now update the highest uSNs of the partitions that are
1092            affected. Note that two partitions could be changing */
1093         for (i=0; i<replmd_private->num_ncs; i++) {
1094                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, 
1095                                         req->op.rename.olddn) == 0) {
1096                         break;
1097                 }
1098         }
1099         if (i == replmd_private->num_ncs) {
1100                 DEBUG(0,(__location__ ": rename olddn outside tree? %s\n",
1101                          ldb_dn_get_linearized(req->op.rename.olddn)));
1102                 return LDB_ERR_OPERATIONS_ERROR;
1103         }
1104         replmd_private->ncs[i].mod_usn = seq_num;
1105
1106         for (i=0; i<replmd_private->num_ncs; i++) {
1107                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, 
1108                                         req->op.rename.newdn) == 0) {
1109                         break;
1110                 }
1111         }
1112         if (i == replmd_private->num_ncs) {
1113                 DEBUG(0,(__location__ ": rename newdn outside tree? %s\n",
1114                          ldb_dn_get_linearized(req->op.rename.newdn)));
1115                 return LDB_ERR_OPERATIONS_ERROR;
1116         }
1117         replmd_private->ncs[i].mod_usn = seq_num;
1118         
1119         /* go on with the call chain */
1120         return ldb_next_request(module, req);
1121 }
1122
1123
1124 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
1125 {
1126         return ret;
1127 }
1128
1129 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
1130 {
1131         int ret = LDB_ERR_OTHER;
1132         /* TODO: do some error mapping */
1133         return ret;
1134 }
1135
1136 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
1137
1138 static int replmd_replicated_apply_add_callback(struct ldb_request *req,
1139                                                 struct ldb_reply *ares)
1140 {
1141         struct ldb_context *ldb;
1142         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1143                                                struct replmd_replicated_request);
1144         int ret;
1145
1146         ldb = ldb_module_get_ctx(ar->module);
1147
1148         if (!ares) {
1149                 return ldb_module_done(ar->req, NULL, NULL,
1150                                         LDB_ERR_OPERATIONS_ERROR);
1151         }
1152         if (ares->error != LDB_SUCCESS) {
1153                 return ldb_module_done(ar->req, ares->controls,
1154                                         ares->response, ares->error);
1155         }
1156
1157         if (ares->type != LDB_REPLY_DONE) {
1158                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1159                 return ldb_module_done(ar->req, NULL, NULL,
1160                                         LDB_ERR_OPERATIONS_ERROR);
1161         }
1162
1163         talloc_free(ares);
1164         ar->index_current++;
1165
1166         ret = replmd_replicated_apply_next(ar);
1167         if (ret != LDB_SUCCESS) {
1168                 return ldb_module_done(ar->req, NULL, NULL, ret);
1169         }
1170
1171         return LDB_SUCCESS;
1172 }
1173
1174 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
1175 {
1176         struct ldb_context *ldb;
1177         struct ldb_request *change_req;
1178         enum ndr_err_code ndr_err;
1179         struct ldb_message *msg;
1180         struct replPropertyMetaDataBlob *md;
1181         struct ldb_val md_value;
1182         uint32_t i;
1183         uint64_t seq_num;
1184         int ret;
1185
1186         /*
1187          * TODO: check if the parent object exist
1188          */
1189
1190         /*
1191          * TODO: handle the conflict case where an object with the
1192          *       same name exist
1193          */
1194
1195         ldb = ldb_module_get_ctx(ar->module);
1196         msg = ar->objs->objects[ar->index_current].msg;
1197         md = ar->objs->objects[ar->index_current].meta_data;
1198
1199         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1200         if (ret != LDB_SUCCESS) {
1201                 return replmd_replicated_request_error(ar, ret);
1202         }
1203
1204         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
1205         if (ret != LDB_SUCCESS) {
1206                 return replmd_replicated_request_error(ar, ret);
1207         }
1208
1209         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1210         if (ret != LDB_SUCCESS) {
1211                 return replmd_replicated_request_error(ar, ret);
1212         }
1213
1214         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
1215         if (ret != LDB_SUCCESS) {
1216                 return replmd_replicated_request_error(ar, ret);
1217         }
1218
1219         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1220         if (ret != LDB_SUCCESS) {
1221                 return replmd_replicated_request_error(ar, ret);
1222         }
1223
1224         ret = replmd_notify(ar->module, msg->dn, seq_num);
1225         if (ret != LDB_SUCCESS) {
1226                 return replmd_replicated_request_error(ar, ret);
1227         }
1228
1229         /* remove any message elements that have zero values */
1230         for (i=0; i<msg->num_elements; i++) {
1231                 if (msg->elements[i].num_values == 0) {
1232                         DEBUG(4,(__location__ ": Removing attribute %s with num_values==0\n",
1233                                  msg->elements[i].name));
1234                         memmove(&msg->elements[i], 
1235                                 &msg->elements[i+1], 
1236                                 sizeof(msg->elements[i])*(msg->num_elements - (i+1)));
1237                         msg->num_elements--;
1238                         i--;
1239                 }
1240         }
1241         
1242         /*
1243          * the meta data array is already sorted by the caller
1244          */
1245         for (i=0; i < md->ctr.ctr1.count; i++) {
1246                 md->ctr.ctr1.array[i].local_usn = seq_num;
1247         }
1248         ndr_err = ndr_push_struct_blob(&md_value, msg, 
1249                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1250                                        md,
1251                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1252         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1253                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1254                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1255         }
1256         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
1257         if (ret != LDB_SUCCESS) {
1258                 return replmd_replicated_request_error(ar, ret);
1259         }
1260
1261         replmd_ldb_message_sort(msg, ar->schema);
1262
1263         if (DEBUGLVL(4)) {
1264                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
1265                 DEBUG(4, ("DRS replication add message:\n%s\n", s));
1266                 talloc_free(s);
1267         }
1268
1269         ret = ldb_build_add_req(&change_req,
1270                                 ldb,
1271                                 ar,
1272                                 msg,
1273                                 ar->controls,
1274                                 ar,
1275                                 replmd_replicated_apply_add_callback,
1276                                 ar->req);
1277         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1278
1279         return ldb_next_request(ar->module, change_req);
1280 }
1281
1282 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
1283                                                          struct replPropertyMetaData1 *m2)
1284 {
1285         int ret;
1286
1287         if (m1->version != m2->version) {
1288                 return m1->version - m2->version;
1289         }
1290
1291         if (m1->originating_change_time != m2->originating_change_time) {
1292                 return m1->originating_change_time - m2->originating_change_time;
1293         }
1294
1295         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
1296         if (ret != 0) {
1297                 return ret;
1298         }
1299
1300         return m1->originating_usn - m2->originating_usn;
1301 }
1302
1303 static int replmd_replicated_apply_merge_callback(struct ldb_request *req,
1304                                                   struct ldb_reply *ares)
1305 {
1306         struct ldb_context *ldb;
1307         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1308                                                struct replmd_replicated_request);
1309         int ret;
1310
1311         ldb = ldb_module_get_ctx(ar->module);
1312
1313         if (!ares) {
1314                 return ldb_module_done(ar->req, NULL, NULL,
1315                                         LDB_ERR_OPERATIONS_ERROR);
1316         }
1317         if (ares->error != LDB_SUCCESS) {
1318                 return ldb_module_done(ar->req, ares->controls,
1319                                         ares->response, ares->error);
1320         }
1321
1322         if (ares->type != LDB_REPLY_DONE) {
1323                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1324                 return ldb_module_done(ar->req, NULL, NULL,
1325                                         LDB_ERR_OPERATIONS_ERROR);
1326         }
1327
1328         talloc_free(ares);
1329         ar->index_current++;
1330
1331         ret = replmd_replicated_apply_next(ar);
1332         if (ret != LDB_SUCCESS) {
1333                 return ldb_module_done(ar->req, NULL, NULL, ret);
1334         }
1335
1336         return LDB_SUCCESS;
1337 }
1338
1339 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
1340 {
1341         struct ldb_context *ldb;
1342         struct ldb_request *change_req;
1343         enum ndr_err_code ndr_err;
1344         struct ldb_message *msg;
1345         struct replPropertyMetaDataBlob *rmd;
1346         struct replPropertyMetaDataBlob omd;
1347         const struct ldb_val *omd_value;
1348         struct replPropertyMetaDataBlob nmd;
1349         struct ldb_val nmd_value;
1350         uint32_t i,j,ni=0;
1351         uint32_t removed_attrs = 0;
1352         uint64_t seq_num;
1353         int ret;
1354
1355         ldb = ldb_module_get_ctx(ar->module);
1356         msg = ar->objs->objects[ar->index_current].msg;
1357         rmd = ar->objs->objects[ar->index_current].meta_data;
1358         ZERO_STRUCT(omd);
1359         omd.version = 1;
1360
1361         /*
1362          * TODO: check repl data is correct after a rename
1363          */
1364         if (ldb_dn_compare(msg->dn, ar->search_msg->dn) != 0) {
1365                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_request rename %s => %s\n",
1366                           ldb_dn_get_linearized(ar->search_msg->dn),
1367                           ldb_dn_get_linearized(msg->dn));
1368                 if (ldb_rename(ldb, ar->search_msg->dn, msg->dn) != LDB_SUCCESS) {
1369                         ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_replicated_request rename %s => %s failed - %s\n",
1370                                   ldb_dn_get_linearized(ar->search_msg->dn),
1371                                   ldb_dn_get_linearized(msg->dn),
1372                                   ldb_errstring(ldb));
1373                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
1374                 }
1375         }
1376
1377         /* find existing meta data */
1378         omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
1379         if (omd_value) {
1380                 ndr_err = ndr_pull_struct_blob(omd_value, ar,
1381                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
1382                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
1383                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1384                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1385                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1386                 }
1387
1388                 if (omd.version != 1) {
1389                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1390                 }
1391         }
1392
1393         ZERO_STRUCT(nmd);
1394         nmd.version = 1;
1395         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
1396         nmd.ctr.ctr1.array = talloc_array(ar,
1397                                           struct replPropertyMetaData1,
1398                                           nmd.ctr.ctr1.count);
1399         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1400
1401         /* first copy the old meta data */
1402         for (i=0; i < omd.ctr.ctr1.count; i++) {
1403                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
1404                 ni++;
1405         }
1406
1407         /* now merge in the new meta data */
1408         for (i=0; i < rmd->ctr.ctr1.count; i++) {
1409                 bool found = false;
1410
1411                 for (j=0; j < ni; j++) {
1412                         int cmp;
1413
1414                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
1415                                 continue;
1416                         }
1417
1418                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
1419                                                                             &nmd.ctr.ctr1.array[j]);
1420                         if (cmp > 0) {
1421                                 /* replace the entry */
1422                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
1423                                 found = true;
1424                                 break;
1425                         }
1426
1427                         /* we don't want to apply this change so remove the attribute */
1428                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
1429                         removed_attrs++;
1430
1431                         found = true;
1432                         break;
1433                 }
1434
1435                 if (found) continue;
1436
1437                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
1438                 ni++;
1439         }
1440
1441         /*
1442          * finally correct the size of the meta_data array
1443          */
1444         nmd.ctr.ctr1.count = ni;
1445
1446         /*
1447          * the rdn attribute (the alias for the name attribute),
1448          * 'cn' for most objects is the last entry in the meta data array
1449          * we have stored
1450          *
1451          * sort the new meta data array
1452          */
1453         ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, ar->schema, msg->dn);
1454         if (ret != LDB_SUCCESS) {
1455                 return ret;
1456         }
1457
1458         /*
1459          * check if some replicated attributes left, otherwise skip the ldb_modify() call
1460          */
1461         if (msg->num_elements == 0) {
1462                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
1463                           ar->index_current);
1464
1465                 ar->index_current++;
1466                 return replmd_replicated_apply_next(ar);
1467         }
1468
1469         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
1470                   ar->index_current, msg->num_elements);
1471
1472         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1473         if (ret != LDB_SUCCESS) {
1474                 return replmd_replicated_request_error(ar, ret);
1475         }
1476
1477         for (i=0; i<ni; i++) {
1478                 nmd.ctr.ctr1.array[i].local_usn = seq_num;
1479         }
1480
1481         /* create the meta data value */
1482         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
1483                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1484                                        &nmd,
1485                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1486         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1487                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1488                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1489         }
1490
1491         /*
1492          * when we know that we'll modify the record, add the whenChanged, uSNChanged
1493          * and replPopertyMetaData attributes
1494          */
1495         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1496         if (ret != LDB_SUCCESS) {
1497                 return replmd_replicated_request_error(ar, ret);
1498         }
1499         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1500         if (ret != LDB_SUCCESS) {
1501                 return replmd_replicated_request_error(ar, ret);
1502         }
1503         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
1504         if (ret != LDB_SUCCESS) {
1505                 return replmd_replicated_request_error(ar, ret);
1506         }
1507
1508         replmd_ldb_message_sort(msg, ar->schema);
1509
1510         /* we want to replace the old values */
1511         for (i=0; i < msg->num_elements; i++) {
1512                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
1513         }
1514
1515         ret = replmd_notify(ar->module, msg->dn, seq_num);
1516         if (ret != LDB_SUCCESS) {
1517                 return replmd_replicated_request_error(ar, ret);
1518         }
1519
1520         if (DEBUGLVL(4)) {
1521                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1522                 DEBUG(4, ("DRS replication modify message:\n%s\n", s));
1523                 talloc_free(s);
1524         }
1525
1526         ret = ldb_build_mod_req(&change_req,
1527                                 ldb,
1528                                 ar,
1529                                 msg,
1530                                 ar->controls,
1531                                 ar,
1532                                 replmd_replicated_apply_merge_callback,
1533                                 ar->req);
1534         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1535
1536         return ldb_next_request(ar->module, change_req);
1537 }
1538
1539 static int replmd_replicated_apply_search_callback(struct ldb_request *req,
1540                                                    struct ldb_reply *ares)
1541 {
1542         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1543                                                struct replmd_replicated_request);
1544         int ret;
1545
1546         if (!ares) {
1547                 return ldb_module_done(ar->req, NULL, NULL,
1548                                         LDB_ERR_OPERATIONS_ERROR);
1549         }
1550         if (ares->error != LDB_SUCCESS &&
1551             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1552                 return ldb_module_done(ar->req, ares->controls,
1553                                         ares->response, ares->error);
1554         }
1555
1556         switch (ares->type) {
1557         case LDB_REPLY_ENTRY:
1558                 ar->search_msg = talloc_steal(ar, ares->message);
1559                 break;
1560
1561         case LDB_REPLY_REFERRAL:
1562                 /* we ignore referrals */
1563                 break;
1564
1565         case LDB_REPLY_DONE:
1566                 if (ar->search_msg != NULL) {
1567                         ret = replmd_replicated_apply_merge(ar);
1568                 } else {
1569                         ret = replmd_replicated_apply_add(ar);
1570                 }
1571                 if (ret != LDB_SUCCESS) {
1572                         return ldb_module_done(ar->req, NULL, NULL, ret);
1573                 }
1574         }
1575
1576         talloc_free(ares);
1577         return LDB_SUCCESS;
1578 }
1579
1580 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
1581
1582 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
1583 {
1584         struct ldb_context *ldb;
1585         int ret;
1586         char *tmp_str;
1587         char *filter;
1588         struct ldb_request *search_req;
1589
1590         if (ar->index_current >= ar->objs->num_objects) {
1591                 /* done with it, go to next stage */
1592                 return replmd_replicated_uptodate_vector(ar);
1593         }
1594
1595         ldb = ldb_module_get_ctx(ar->module);
1596         ar->search_msg = NULL;
1597
1598         tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].guid_value);
1599         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1600
1601         filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
1602         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1603         talloc_free(tmp_str);
1604
1605         ret = ldb_build_search_req(&search_req,
1606                                    ldb,
1607                                    ar,
1608                                    ar->objs->partition_dn,
1609                                    LDB_SCOPE_SUBTREE,
1610                                    filter,
1611                                    NULL,
1612                                    NULL,
1613                                    ar,
1614                                    replmd_replicated_apply_search_callback,
1615                                    ar->req);
1616
1617         ret = ldb_request_add_control(search_req, LDB_CONTROL_SHOW_DELETED_OID, true, NULL);
1618         if (ret != LDB_SUCCESS) {
1619                 return ret;
1620         }
1621         
1622
1623         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1624
1625         return ldb_next_request(ar->module, search_req);
1626 }
1627
1628 static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
1629                                                       struct ldb_reply *ares)
1630 {
1631         struct ldb_context *ldb;
1632         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1633                                                struct replmd_replicated_request);
1634         ldb = ldb_module_get_ctx(ar->module);
1635
1636         if (!ares) {
1637                 return ldb_module_done(ar->req, NULL, NULL,
1638                                         LDB_ERR_OPERATIONS_ERROR);
1639         }
1640         if (ares->error != LDB_SUCCESS) {
1641                 return ldb_module_done(ar->req, ares->controls,
1642                                         ares->response, ares->error);
1643         }
1644
1645         if (ares->type != LDB_REPLY_DONE) {
1646                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1647                 return ldb_module_done(ar->req, NULL, NULL,
1648                                         LDB_ERR_OPERATIONS_ERROR);
1649         }
1650
1651         talloc_free(ares);
1652
1653         return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
1654 }
1655
1656 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1657 {
1658         struct ldb_context *ldb;
1659         struct ldb_request *change_req;
1660         enum ndr_err_code ndr_err;
1661         struct ldb_message *msg;
1662         struct replUpToDateVectorBlob ouv;
1663         const struct ldb_val *ouv_value;
1664         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1665         struct replUpToDateVectorBlob nuv;
1666         struct ldb_val nuv_value;
1667         struct ldb_message_element *nuv_el = NULL;
1668         const struct GUID *our_invocation_id;
1669         struct ldb_message_element *orf_el = NULL;
1670         struct repsFromToBlob nrf;
1671         struct ldb_val *nrf_value = NULL;
1672         struct ldb_message_element *nrf_el = NULL;
1673         uint32_t i,j,ni=0;
1674         bool found = false;
1675         time_t t = time(NULL);
1676         NTTIME now;
1677         int ret;
1678
1679         ldb = ldb_module_get_ctx(ar->module);
1680         ruv = ar->objs->uptodateness_vector;
1681         ZERO_STRUCT(ouv);
1682         ouv.version = 2;
1683         ZERO_STRUCT(nuv);
1684         nuv.version = 2;
1685
1686         unix_to_nt_time(&now, t);
1687
1688         /*
1689          * first create the new replUpToDateVector
1690          */
1691         ouv_value = ldb_msg_find_ldb_val(ar->search_msg, "replUpToDateVector");
1692         if (ouv_value) {
1693                 ndr_err = ndr_pull_struct_blob(ouv_value, ar,
1694                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &ouv,
1695                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1696                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1697                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1698                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1699                 }
1700
1701                 if (ouv.version != 2) {
1702                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1703                 }
1704         }
1705
1706         /*
1707          * the new uptodateness vector will at least
1708          * contain 1 entry, one for the source_dsa
1709          *
1710          * plus optional values from our old vector and the one from the source_dsa
1711          */
1712         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
1713         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1714         nuv.ctr.ctr2.cursors = talloc_array(ar,
1715                                             struct drsuapi_DsReplicaCursor2,
1716                                             nuv.ctr.ctr2.count);
1717         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1718
1719         /* first copy the old vector */
1720         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1721                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1722                 ni++;
1723         }
1724
1725         /* get our invocation_id if we have one already attached to the ldb */
1726         our_invocation_id = samdb_ntds_invocation_id(ldb);
1727
1728         /* merge in the source_dsa vector is available */
1729         for (i=0; (ruv && i < ruv->count); i++) {
1730                 found = false;
1731
1732                 if (our_invocation_id &&
1733                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1734                                our_invocation_id)) {
1735                         continue;
1736                 }
1737
1738                 for (j=0; j < ni; j++) {
1739                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1740                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1741                                 continue;
1742                         }
1743
1744                         found = true;
1745
1746                         /*
1747                          * we update only the highest_usn and not the latest_sync_success time,
1748                          * because the last success stands for direct replication
1749                          */
1750                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1751                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1752                         }
1753                         break;                  
1754                 }
1755
1756                 if (found) continue;
1757
1758                 /* if it's not there yet, add it */
1759                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1760                 ni++;
1761         }
1762
1763         /*
1764          * merge in the current highwatermark for the source_dsa
1765          */
1766         found = false;
1767         for (j=0; j < ni; j++) {
1768                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1769                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1770                         continue;
1771                 }
1772
1773                 found = true;
1774
1775                 /*
1776                  * here we update the highest_usn and last_sync_success time
1777                  * because we're directly replicating from the source_dsa
1778                  *
1779                  * and use the tmp_highest_usn because this is what we have just applied
1780                  * to our ldb
1781                  */
1782                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1783                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1784                 break;
1785         }
1786         if (!found) {
1787                 /*
1788                  * here we update the highest_usn and last_sync_success time
1789                  * because we're directly replicating from the source_dsa
1790                  *
1791                  * and use the tmp_highest_usn because this is what we have just applied
1792                  * to our ldb
1793                  */
1794                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1795                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1796                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1797                 ni++;
1798         }
1799
1800         /*
1801          * finally correct the size of the cursors array
1802          */
1803         nuv.ctr.ctr2.count = ni;
1804
1805         /*
1806          * sort the cursors
1807          */
1808         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1809               sizeof(struct drsuapi_DsReplicaCursor2),
1810               (comparison_fn_t)drsuapi_DsReplicaCursor2_compare);
1811
1812         /*
1813          * create the change ldb_message
1814          */
1815         msg = ldb_msg_new(ar);
1816         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1817         msg->dn = ar->search_msg->dn;
1818
1819         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
1820                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1821                                        &nuv,
1822                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1823         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1824                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1825                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1826         }
1827         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1828         if (ret != LDB_SUCCESS) {
1829                 return replmd_replicated_request_error(ar, ret);
1830         }
1831         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1832
1833         /*
1834          * now create the new repsFrom value from the given repsFromTo1 structure
1835          */
1836         ZERO_STRUCT(nrf);
1837         nrf.version                                     = 1;
1838         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1839         /* and fix some values... */
1840         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1841         nrf.ctr.ctr1.last_success                       = now;
1842         nrf.ctr.ctr1.last_attempt                       = now;
1843         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1844         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1845
1846         /*
1847          * first see if we already have a repsFrom value for the current source dsa
1848          * if so we'll later replace this value
1849          */
1850         orf_el = ldb_msg_find_element(ar->search_msg, "repsFrom");
1851         if (orf_el) {
1852                 for (i=0; i < orf_el->num_values; i++) {
1853                         struct repsFromToBlob *trf;
1854
1855                         trf = talloc(ar, struct repsFromToBlob);
1856                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1857
1858                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), trf,
1859                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1860                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1861                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1862                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1863                         }
1864
1865                         if (trf->version != 1) {
1866                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1867                         }
1868
1869                         /*
1870                          * we compare the source dsa objectGUID not the invocation_id
1871                          * because we want only one repsFrom value per source dsa
1872                          * and when the invocation_id of the source dsa has changed we don't need 
1873                          * the old repsFrom with the old invocation_id
1874                          */
1875                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1876                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1877                                 talloc_free(trf);
1878                                 continue;
1879                         }
1880
1881                         talloc_free(trf);
1882                         nrf_value = &orf_el->values[i];
1883                         break;
1884                 }
1885
1886                 /*
1887                  * copy over all old values to the new ldb_message
1888                  */
1889                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1890                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1891                 *nrf_el = *orf_el;
1892         }
1893
1894         /*
1895          * if we haven't found an old repsFrom value for the current source dsa
1896          * we'll add a new value
1897          */
1898         if (!nrf_value) {
1899                 struct ldb_val zero_value;
1900                 ZERO_STRUCT(zero_value);
1901                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1902                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1903
1904                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1905         }
1906
1907         /* we now fill the value which is already attached to ldb_message */
1908         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
1909                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1910                                        &nrf,
1911                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1912         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1913                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1914                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1915         }
1916
1917         /* 
1918          * the ldb_message_element for the attribute, has all the old values and the new one
1919          * so we'll replace the whole attribute with all values
1920          */
1921         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1922
1923         if (DEBUGLVL(4)) {
1924                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1925                 DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
1926                 talloc_free(s);
1927         }
1928
1929         /* prepare the ldb_modify() request */
1930         ret = ldb_build_mod_req(&change_req,
1931                                 ldb,
1932                                 ar,
1933                                 msg,
1934                                 ar->controls,
1935                                 ar,
1936                                 replmd_replicated_uptodate_modify_callback,
1937                                 ar->req);
1938         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1939
1940         return ldb_next_request(ar->module, change_req);
1941 }
1942
1943 static int replmd_replicated_uptodate_search_callback(struct ldb_request *req,
1944                                                       struct ldb_reply *ares)
1945 {
1946         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1947                                                struct replmd_replicated_request);
1948         int ret;
1949
1950         if (!ares) {
1951                 return ldb_module_done(ar->req, NULL, NULL,
1952                                         LDB_ERR_OPERATIONS_ERROR);
1953         }
1954         if (ares->error != LDB_SUCCESS &&
1955             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1956                 return ldb_module_done(ar->req, ares->controls,
1957                                         ares->response, ares->error);
1958         }
1959
1960         switch (ares->type) {
1961         case LDB_REPLY_ENTRY:
1962                 ar->search_msg = talloc_steal(ar, ares->message);
1963                 break;
1964
1965         case LDB_REPLY_REFERRAL:
1966                 /* we ignore referrals */
1967                 break;
1968
1969         case LDB_REPLY_DONE:
1970                 if (ar->search_msg == NULL) {
1971                         ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1972                 } else {
1973                         ret = replmd_replicated_uptodate_modify(ar);
1974                 }
1975                 if (ret != LDB_SUCCESS) {
1976                         return ldb_module_done(ar->req, NULL, NULL, ret);
1977                 }
1978         }
1979
1980         talloc_free(ares);
1981         return LDB_SUCCESS;
1982 }
1983
1984
1985 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1986 {
1987         struct ldb_context *ldb;
1988         int ret;
1989         static const char *attrs[] = {
1990                 "replUpToDateVector",
1991                 "repsFrom",
1992                 NULL
1993         };
1994         struct ldb_request *search_req;
1995
1996         ldb = ldb_module_get_ctx(ar->module);
1997         ar->search_msg = NULL;
1998
1999         ret = ldb_build_search_req(&search_req,
2000                                    ldb,
2001                                    ar,
2002                                    ar->objs->partition_dn,
2003                                    LDB_SCOPE_BASE,
2004                                    "(objectClass=*)",
2005                                    attrs,
2006                                    NULL,
2007                                    ar,
2008                                    replmd_replicated_uptodate_search_callback,
2009                                    ar->req);
2010         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
2011
2012         return ldb_next_request(ar->module, search_req);
2013 }
2014
2015
2016
2017 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
2018 {
2019         struct ldb_context *ldb;
2020         struct dsdb_extended_replicated_objects *objs;
2021         struct replmd_replicated_request *ar;
2022         struct ldb_control **ctrls;
2023         int ret, i;
2024         struct dsdb_control_current_partition *partition_ctrl;
2025         struct replmd_private *replmd_private = 
2026                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2027
2028         ldb = ldb_module_get_ctx(module);
2029
2030         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
2031
2032         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
2033         if (!objs) {
2034                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
2035                 return LDB_ERR_PROTOCOL_ERROR;
2036         }
2037
2038         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
2039                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
2040                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
2041                 return LDB_ERR_PROTOCOL_ERROR;
2042         }
2043
2044         ar = replmd_ctx_init(module, req);
2045         if (!ar)
2046                 return LDB_ERR_OPERATIONS_ERROR;
2047
2048         ar->objs = objs;
2049         ar->schema = dsdb_get_schema(ldb);
2050         if (!ar->schema) {
2051                 ldb_debug_set(ldb, LDB_DEBUG_FATAL, "replmd_ctx_init: no loaded schema found\n");
2052                 talloc_free(ar);
2053                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
2054                 return LDB_ERR_CONSTRAINT_VIOLATION;
2055         }
2056
2057         ctrls = req->controls;
2058
2059         if (req->controls) {
2060                 req->controls = talloc_memdup(ar, req->controls,
2061                                               talloc_get_size(req->controls));
2062                 if (!req->controls) return replmd_replicated_request_werror(ar, WERR_NOMEM);
2063         }
2064
2065         ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, NULL);
2066         if (ret != LDB_SUCCESS) {
2067                 return ret;
2068         }
2069
2070         /*
2071           add the DSDB_CONTROL_CURRENT_PARTITION_OID control. This
2072           tells the partition module which partition this request is
2073           directed at. That is important as the partition roots appear
2074           twice in the directory, once as mount points in the top
2075           level store, and once as the roots of each partition. The
2076           replication code wants to operate on the root of the
2077           partitions, not the top level mount points
2078          */
2079         partition_ctrl = talloc(req, struct dsdb_control_current_partition);
2080         if (partition_ctrl == NULL) {
2081                 if (!partition_ctrl) return replmd_replicated_request_werror(ar, WERR_NOMEM);
2082         }
2083         partition_ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
2084         partition_ctrl->dn = objs->partition_dn;
2085
2086         ret = ldb_request_add_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID, false, partition_ctrl);
2087         if (ret != LDB_SUCCESS) {
2088                 return ret;
2089         }
2090
2091         ar->controls = req->controls;
2092         req->controls = ctrls;
2093
2094         DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
2095
2096         /* save away the linked attributes for the end of the
2097            transaction */
2098         for (i=0; i<ar->objs->linked_attributes_count; i++) {
2099                 struct la_entry *la_entry;
2100
2101                 if (replmd_private->la_ctx == NULL) {
2102                         replmd_private->la_ctx = talloc_new(replmd_private);
2103                 }
2104                 la_entry = talloc(replmd_private->la_ctx, struct la_entry);
2105                 if (la_entry == NULL) {
2106                         ldb_oom(ldb);
2107                         return LDB_ERR_OPERATIONS_ERROR;
2108                 }
2109                 la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
2110                 if (la_entry->la == NULL) {
2111                         talloc_free(la_entry);
2112                         ldb_oom(ldb);
2113                         return LDB_ERR_OPERATIONS_ERROR;
2114                 }
2115                 *la_entry->la = ar->objs->linked_attributes[i];
2116
2117                 /* we need to steal the non-scalars so they stay
2118                    around until the end of the transaction */
2119                 talloc_steal(la_entry->la, la_entry->la->identifier);
2120                 talloc_steal(la_entry->la, la_entry->la->value.blob);
2121
2122                 DLIST_ADD(replmd_private->la_list, la_entry);
2123         }
2124
2125         return replmd_replicated_apply_next(ar);
2126 }
2127
2128 /*
2129   process one linked attribute structure
2130  */
2131 static int replmd_process_linked_attribute(struct ldb_module *module,
2132                                            struct la_entry *la_entry)
2133 {                                          
2134         struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
2135         struct ldb_context *ldb = ldb_module_get_ctx(module);
2136         struct drsuapi_DsReplicaObjectIdentifier3 target;
2137         struct ldb_message *msg;
2138         struct ldb_message_element *ret_el;
2139         TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
2140         enum ndr_err_code ndr_err;
2141         struct ldb_request *mod_req;
2142         int ret;
2143         const struct dsdb_attribute *attr;
2144         struct ldb_dn *target_dn;
2145         uint64_t seq_num = 0;
2146
2147 /*
2148 linked_attributes[0]:                                                     
2149      &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute 
2150         identifier               : *                                      
2151             identifier: struct drsuapi_DsReplicaObjectIdentifier          
2152                 __ndr_size               : 0x0000003a (58)                
2153                 __ndr_size_sid           : 0x00000000 (0)                 
2154                 guid                     : 8e95b6a9-13dd-4158-89db-3220a5be5cc7
2155                 sid                      : S-0-0                               
2156                 __ndr_size_dn            : 0x00000000 (0)                      
2157                 dn                       : ''                                  
2158         attid                    : DRSUAPI_ATTRIBUTE_member (0x1F)             
2159         value: struct drsuapi_DsAttributeValue                                 
2160             __ndr_size               : 0x0000007e (126)                        
2161             blob                     : *                                       
2162                 blob                     : DATA_BLOB length=126                
2163         flags                    : 0x00000001 (1)                              
2164                1: DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE                      
2165         originating_add_time     : Wed Sep  2 22:20:01 2009 EST                
2166         meta_data: struct drsuapi_DsReplicaMetaData                            
2167             version                  : 0x00000015 (21)                         
2168             originating_change_time  : Wed Sep  2 23:39:07 2009 EST            
2169             originating_invocation_id: 794640f3-18cf-40ee-a211-a93992b67a64    
2170             originating_usn          : 0x000000000001e19c (123292)             
2171      &target: struct drsuapi_DsReplicaObjectIdentifier3                        
2172         __ndr_size               : 0x0000007e (126)                            
2173         __ndr_size_sid           : 0x0000001c (28)                             
2174         guid                     : 7639e594-db75-4086-b0d4-67890ae46031        
2175         sid                      : S-1-5-21-2848215498-2472035911-1947525656-19924
2176         __ndr_size_dn            : 0x00000022 (34)                                
2177         dn                       : 'CN=UOne,OU=TestOU,DC=vsofs8,DC=com'           
2178  */
2179         if (DEBUGLVL(4)) {
2180                 NDR_PRINT_DEBUG(drsuapi_DsReplicaLinkedAttribute, la); 
2181         }
2182         
2183         /* decode the target of the link */
2184         ndr_err = ndr_pull_struct_blob(la->value.blob, 
2185                                        tmp_ctx, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
2186                                        &target,
2187                                        (ndr_pull_flags_fn_t)ndr_pull_drsuapi_DsReplicaObjectIdentifier3);
2188         if (ndr_err != NDR_ERR_SUCCESS) {
2189                 DEBUG(0,("Unable to decode linked_attribute target\n"));
2190                 dump_data(4, la->value.blob->data, la->value.blob->length);                     
2191                 talloc_free(tmp_ctx);
2192                 return LDB_ERR_OPERATIONS_ERROR;
2193         }
2194         if (DEBUGLVL(4)) {
2195                 NDR_PRINT_DEBUG(drsuapi_DsReplicaObjectIdentifier3, &target);
2196         }
2197
2198         /* construct a modify request for this attribute change */
2199         msg = ldb_msg_new(tmp_ctx);
2200         if (!msg) {
2201                 ldb_oom(ldb);
2202                 talloc_free(tmp_ctx);
2203                 return LDB_ERR_OPERATIONS_ERROR;
2204         }
2205
2206         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, 
2207                                    GUID_string(tmp_ctx, &la->identifier->guid), &msg->dn);
2208         if (ret != LDB_SUCCESS) {
2209                 talloc_free(tmp_ctx);
2210                 return ret;
2211         }
2212
2213         /* find the attribute being modified */
2214         attr = dsdb_attribute_by_attributeID_id(dsdb_get_schema(ldb), la->attid);
2215         if (attr == NULL) {
2216                 DEBUG(0, (__location__ ": Unable to find attributeID 0x%x\n", la->attid));
2217                 talloc_free(tmp_ctx);
2218                 return LDB_ERR_OPERATIONS_ERROR;
2219         }
2220
2221         if (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) {
2222                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
2223                                         LDB_FLAG_MOD_ADD, &ret_el);
2224         } else {
2225                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
2226                                         LDB_FLAG_MOD_DELETE, &ret_el);
2227         }
2228         if (ret != LDB_SUCCESS) {
2229                 talloc_free(tmp_ctx);
2230                 return ret;
2231         }
2232         /* we allocate two entries here, in case we need a remove/add
2233            pair */
2234         ret_el->values = talloc_array(msg, struct ldb_val, 2);
2235         if (!ret_el->values) {
2236                 ldb_oom(ldb);
2237                 talloc_free(tmp_ctx);
2238                 return LDB_ERR_OPERATIONS_ERROR;
2239         }
2240         ret_el->num_values = 1;
2241
2242         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, GUID_string(tmp_ctx, &target.guid), &target_dn);
2243         if (ret != LDB_SUCCESS) {
2244                 DEBUG(0,(__location__ ": Failed to map GUID %s to DN\n", GUID_string(tmp_ctx, &target.guid)));
2245                 talloc_free(tmp_ctx);
2246                 return LDB_ERR_OPERATIONS_ERROR;
2247         }
2248
2249         ret_el->values[0].data = (uint8_t *)ldb_dn_get_extended_linearized(tmp_ctx, target_dn, 1);
2250         ret_el->values[0].length = strlen((char *)ret_el->values[0].data);
2251
2252         ret = replmd_update_rpmd(module, msg, &seq_num);
2253         if (ret != LDB_SUCCESS) {
2254                 talloc_free(tmp_ctx);
2255                 return ret;
2256         }
2257
2258         /* we only change whenChanged and uSNChanged if the seq_num
2259            has changed */
2260         if (seq_num != 0) {
2261                 time_t t = time(NULL);
2262
2263                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
2264                         talloc_free(tmp_ctx);
2265                         return LDB_ERR_OPERATIONS_ERROR;
2266                 }
2267
2268                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
2269                         talloc_free(tmp_ctx);
2270                         return LDB_ERR_OPERATIONS_ERROR;
2271                 }
2272         }
2273
2274         ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2275                                 msg,
2276                                 NULL,
2277                                 NULL, 
2278                                 ldb_op_default_callback,
2279                                 NULL);
2280         if (ret != LDB_SUCCESS) {
2281                 talloc_free(tmp_ctx);
2282                 return ret;
2283         }
2284         talloc_steal(mod_req, msg);
2285
2286         if (DEBUGLVL(4)) {
2287                 DEBUG(4,("Applying DRS linked attribute change:\n%s\n",
2288                          ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg)));
2289         }
2290
2291         /* Run the new request */
2292         ret = ldb_next_request(module, mod_req);
2293
2294         /* we need to wait for this to finish, as we are being called
2295            from the synchronous end_transaction hook of this module */
2296         if (ret == LDB_SUCCESS) {
2297                 ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
2298         }
2299
2300         if (ret == LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
2301                 /* the link destination exists, we need to update it
2302                  * by deleting the old one for the same DN then adding
2303                  * the new one */
2304                 msg->elements = talloc_realloc(msg, msg->elements,
2305                                                struct ldb_message_element,
2306                                                msg->num_elements+1);
2307                 if (msg->elements == NULL) {
2308                         ldb_oom(ldb);
2309                         talloc_free(tmp_ctx);
2310                         return LDB_ERR_OPERATIONS_ERROR;
2311                 }
2312                 /* this relies on the backend matching the old entry
2313                    only by the DN portion of the extended DN */
2314                 msg->elements[1] = msg->elements[0];
2315                 msg->elements[0].flags = LDB_FLAG_MOD_DELETE;
2316                 msg->num_elements++;
2317
2318                 ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2319                                         msg,
2320                                         NULL,
2321                                         NULL, 
2322                                         ldb_op_default_callback,
2323                                         NULL);
2324                 if (ret != LDB_SUCCESS) {
2325                         talloc_free(tmp_ctx);
2326                         return ret;
2327                 }
2328
2329                 /* Run the new request */
2330                 ret = ldb_next_request(module, mod_req);
2331                 
2332                 if (ret == LDB_SUCCESS) {
2333                         ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
2334                 }
2335         }
2336
2337         if (ret != LDB_SUCCESS) {
2338                 ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s' %s\n",
2339                           ldb_errstring(ldb),
2340                           ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
2341                 ret = LDB_SUCCESS;
2342         }
2343         
2344         talloc_free(tmp_ctx);
2345
2346         return ret;     
2347 }
2348
2349 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
2350 {
2351         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
2352                 return replmd_extended_replicated_objects(module, req);
2353         }
2354
2355         return ldb_next_request(module, req);
2356 }
2357
2358
2359 /*
2360   we hook into the transaction operations to allow us to 
2361   perform the linked attribute updates at the end of the whole
2362   transaction. This allows a forward linked attribute to be created
2363   before the object is created. During a vampire, w2k8 sends us linked
2364   attributes before the objects they are part of.
2365  */
2366 static int replmd_start_transaction(struct ldb_module *module)
2367 {
2368         /* create our private structure for this transaction */
2369         int i;
2370         struct replmd_private *replmd_private = talloc_get_type(ldb_module_get_private(module),
2371                                                                 struct replmd_private);
2372         talloc_free(replmd_private->la_ctx);
2373         replmd_private->la_list = NULL;
2374         replmd_private->la_ctx = NULL;
2375
2376         for (i=0; i<replmd_private->num_ncs; i++) {
2377                 replmd_private->ncs[i].mod_usn = 0;
2378         }
2379
2380         return ldb_next_start_trans(module);
2381 }
2382
2383 /*
2384   on prepare commit we loop over our queued la_context structures and
2385   apply each of them  
2386  */
2387 static int replmd_prepare_commit(struct ldb_module *module)
2388 {
2389         struct replmd_private *replmd_private = 
2390                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2391         struct la_entry *la, *prev;
2392         int ret;
2393
2394         /* walk the list backwards, to do the first entry first, as we
2395          * added the entries with DLIST_ADD() which puts them at the
2396          * start of the list */
2397         for (la = replmd_private->la_list; la && la->next; la=la->next) ;
2398
2399         for (; la; la=prev) {
2400                 prev = la->prev;
2401                 DLIST_REMOVE(replmd_private->la_list, la);
2402                 ret = replmd_process_linked_attribute(module, la);
2403                 if (ret != LDB_SUCCESS) {
2404                         talloc_free(replmd_private->la_ctx);
2405                         replmd_private->la_list = NULL;
2406                         replmd_private->la_ctx = NULL;
2407                         return ret;
2408                 }
2409         }
2410
2411         talloc_free(replmd_private->la_ctx);
2412         replmd_private->la_list = NULL;
2413         replmd_private->la_ctx = NULL;
2414
2415         /* possibly change @REPLCHANGED */
2416         ret = replmd_notify_store(module);
2417         if (ret != LDB_SUCCESS) {
2418                 return ret;
2419         }
2420         
2421         return ldb_next_prepare_commit(module);
2422 }
2423
2424 static int replmd_del_transaction(struct ldb_module *module)
2425 {
2426         struct replmd_private *replmd_private = 
2427                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2428         talloc_free(replmd_private->la_ctx);
2429         replmd_private->la_list = NULL;
2430         replmd_private->la_ctx = NULL;
2431         return ldb_next_del_trans(module);
2432 }
2433
2434
2435 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
2436         .name          = "repl_meta_data",
2437         .init_context      = replmd_init,
2438         .add               = replmd_add,
2439         .modify            = replmd_modify,
2440         .rename            = replmd_rename,
2441         .extended          = replmd_extended,
2442         .start_transaction = replmd_start_transaction,
2443         .prepare_commit    = replmd_prepare_commit,
2444         .del_transaction   = replmd_del_transaction,
2445 };