s4:dsdb Remove potentially confusing 'partition' control from result
[amitay/samba.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /*
24  *  Name: ldb
25  *
26  *  Component: ldb repl_meta_data module
27  *
28  *  Description: - add a unique objectGUID onto every new record,
29  *               - handle whenCreated, whenChanged timestamps
30  *               - handle uSNCreated, uSNChanged numbers
31  *               - handle replPropertyMetaData attribute
32  *
33  *  Author: Simo Sorce
34  *  Author: Stefan Metzmacher
35  */
36
37 #include "includes.h"
38 #include "ldb_module.h"
39 #include "dsdb/samdb/samdb.h"
40 #include "dsdb/common/proto.h"
41 #include "../libds/common/flags.h"
42 #include "librpc/gen_ndr/ndr_misc.h"
43 #include "librpc/gen_ndr/ndr_drsuapi.h"
44 #include "librpc/gen_ndr/ndr_drsblobs.h"
45 #include "param/param.h"
46 #include "libcli/security/dom_sid.h"
47 #include "lib/util/dlinklist.h"
48
49 struct replmd_private {
50         TALLOC_CTX *la_ctx;
51         struct la_entry *la_list;
52         uint32_t num_ncs;
53         struct nc_entry {
54                 struct nc_entry *prev, *next;
55                 struct ldb_dn *dn;
56                 uint64_t mod_usn;
57         } *ncs;
58 };
59
60 struct la_entry {
61         struct la_entry *next, *prev;
62         struct drsuapi_DsReplicaLinkedAttribute *la;
63 };
64
65 struct replmd_replicated_request {
66         struct ldb_module *module;
67         struct ldb_request *req;
68
69         const struct dsdb_schema *schema;
70
71         /* the controls we pass down */
72         struct ldb_control **controls;
73
74         /* details for the mode where we apply a bunch of inbound replication meessages */
75         bool apply_mode;
76         uint32_t index_current;
77         struct dsdb_extended_replicated_objects *objs;
78
79         struct ldb_message *search_msg;
80
81         uint64_t seq_num;
82
83 };
84
85 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
86
87 /*
88   initialise the module
89   allocate the private structure and build the list
90   of partition DNs for use by replmd_notify()
91  */
92 static int replmd_init(struct ldb_module *module)
93 {
94         struct replmd_private *replmd_private;
95         struct ldb_context *ldb = ldb_module_get_ctx(module);
96
97         replmd_private = talloc_zero(module, struct replmd_private);
98         if (replmd_private == NULL) {
99                 ldb_oom(ldb);
100                 return LDB_ERR_OPERATIONS_ERROR;
101         }
102         ldb_module_set_private(module, replmd_private);
103
104         return ldb_next_init(module);
105 }
106
107
108 /*
109  * Callback for most write operations in this module:
110  * 
111  * notify the repl task that a object has changed. The notifies are
112  * gathered up in the replmd_private structure then written to the
113  * @REPLCHANGED object in each partition during the prepare_commit
114  */
115 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
116 {
117         int ret;
118         struct replmd_replicated_request *ac = 
119                 talloc_get_type_abort(req->context, struct replmd_replicated_request);
120         struct replmd_private *replmd_private = 
121                 talloc_get_type_abort(ldb_module_get_private(ac->module), struct replmd_private);
122         struct nc_entry *modified_partition;
123         struct ldb_control *partition_ctrl;
124         const struct dsdb_control_current_partition *partition;
125
126         struct ldb_control **controls;
127
128         partition_ctrl = ldb_reply_get_control(ares, DSDB_CONTROL_CURRENT_PARTITION_OID);
129
130         /* Remove the 'partition' control from what we pass up the chain */
131         controls = controls_except_specified(ares->controls, ares, partition_ctrl);
132
133         if (ares->error != LDB_SUCCESS) {
134                 return ldb_module_done(ac->req, controls,
135                                         ares->response, ares->error);
136         }
137
138         if (ares->type != LDB_REPLY_DONE) {
139                 ldb_set_errstring(ldb_module_get_ctx(ac->module), "Invalid reply type for notify\n!");
140                 return ldb_module_done(ac->req, NULL,
141                                        NULL, LDB_ERR_OPERATIONS_ERROR);
142         }
143
144         if (!partition_ctrl) {
145                 return ldb_module_done(ac->req, NULL,
146                                        NULL, LDB_ERR_OPERATIONS_ERROR);
147         }
148
149         partition = talloc_get_type_abort(partition_ctrl->data,
150                                     struct dsdb_control_current_partition);
151         
152         if (ac->seq_num > 0) {
153                 for (modified_partition = replmd_private->ncs; modified_partition; 
154                      modified_partition = modified_partition->next) {
155                         if (ldb_dn_compare(modified_partition->dn, partition->dn) == 0) {
156                                 break;
157                         }
158                 }
159                 
160                 if (modified_partition == NULL) {
161                         modified_partition = talloc_zero(replmd_private, struct nc_entry);
162                         if (!modified_partition) {
163                                 ldb_oom(ldb_module_get_ctx(ac->module));
164                                 return ldb_module_done(ac->req, NULL,
165                                                        NULL, LDB_ERR_OPERATIONS_ERROR);
166                         }
167                         modified_partition->dn = ldb_dn_copy(modified_partition, partition->dn);
168                         if (!modified_partition->dn) {
169                                 ldb_oom(ldb_module_get_ctx(ac->module));
170                                 return ldb_module_done(ac->req, NULL,
171                                                        NULL, LDB_ERR_OPERATIONS_ERROR);
172                         }
173                         DLIST_ADD(replmd_private->ncs, modified_partition);
174                 }
175
176                 if (ac->seq_num > modified_partition->mod_usn) {
177                         modified_partition->mod_usn = ac->seq_num;
178                 }
179         }
180
181         if (ac->apply_mode) {
182                 talloc_free(ares);
183                 ac->index_current++;
184                 
185                 ret = replmd_replicated_apply_next(ac);
186                 if (ret != LDB_SUCCESS) {
187                         return ldb_module_done(ac->req, NULL, NULL, ret);
188                 }
189                 return ret;
190         } else {
191                 /* free the partition control container here, for the
192                  * common path.  Other cases will have it cleaned up
193                  * eventually with the ares */
194                 talloc_free(partition_ctrl);
195                 return ldb_module_done(ac->req, 
196                                        controls_except_specified(controls, ares, partition_ctrl),
197                                        ares->response, LDB_SUCCESS);
198         }
199 }
200
201
202 /*
203  * update a @REPLCHANGED record in each partition if there have been
204  * any writes of replicated data in the partition
205  */
206 static int replmd_notify_store(struct ldb_module *module)
207 {
208         struct replmd_private *replmd_private = 
209                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
210         struct ldb_context *ldb = ldb_module_get_ctx(module);
211         struct nc_entry *modified_partition;
212
213         for (modified_partition = replmd_private->ncs; modified_partition; 
214              modified_partition = modified_partition->next) {
215                 int ret;
216
217                 ret = dsdb_save_partition_usn(ldb, modified_partition->dn, modified_partition->mod_usn);
218                 if (ret != LDB_SUCCESS) {
219                         DEBUG(0,(__location__ ": Failed to save partition uSN for %s\n",
220                                  ldb_dn_get_linearized(modified_partition->dn)));
221                         return ret;
222                 }
223         }
224
225         return LDB_SUCCESS;
226 }
227
228
229 /*
230   created a replmd_replicated_request context
231  */
232 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
233                                                          struct ldb_request *req)
234 {
235         struct ldb_context *ldb;
236         struct replmd_replicated_request *ac;
237
238         ldb = ldb_module_get_ctx(module);
239
240         ac = talloc_zero(req, struct replmd_replicated_request);
241         if (ac == NULL) {
242                 ldb_oom(ldb);
243                 return NULL;
244         }
245
246         ac->module = module;
247         ac->req = req;
248
249         ac->schema = dsdb_get_schema(ldb);
250         if (!ac->schema) {
251                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
252                               "replmd_modify: no dsdb_schema loaded");
253                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
254                 return NULL;
255         }
256
257         return ac;
258 }
259
260 /*
261   add a time element to a record
262 */
263 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
264 {
265         struct ldb_message_element *el;
266         char *s;
267
268         if (ldb_msg_find_element(msg, attr) != NULL) {
269                 return LDB_SUCCESS;
270         }
271
272         s = ldb_timestring(msg, t);
273         if (s == NULL) {
274                 return LDB_ERR_OPERATIONS_ERROR;
275         }
276
277         if (ldb_msg_add_string(msg, attr, s) != LDB_SUCCESS) {
278                 return LDB_ERR_OPERATIONS_ERROR;
279         }
280
281         el = ldb_msg_find_element(msg, attr);
282         /* always set as replace. This works because on add ops, the flag
283            is ignored */
284         el->flags = LDB_FLAG_MOD_REPLACE;
285
286         return LDB_SUCCESS;
287 }
288
289 /*
290   add a uint64_t element to a record
291 */
292 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
293 {
294         struct ldb_message_element *el;
295
296         if (ldb_msg_find_element(msg, attr) != NULL) {
297                 return LDB_SUCCESS;
298         }
299
300         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != LDB_SUCCESS) {
301                 return LDB_ERR_OPERATIONS_ERROR;
302         }
303
304         el = ldb_msg_find_element(msg, attr);
305         /* always set as replace. This works because on add ops, the flag
306            is ignored */
307         el->flags = LDB_FLAG_MOD_REPLACE;
308
309         return LDB_SUCCESS;
310 }
311
312 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
313                                                    const struct replPropertyMetaData1 *m2,
314                                                    const uint32_t *rdn_attid)
315 {
316         if (m1->attid == m2->attid) {
317                 return 0;
318         }
319
320         /*
321          * the rdn attribute should be at the end!
322          * so we need to return a value greater than zero
323          * which means m1 is greater than m2
324          */
325         if (m1->attid == *rdn_attid) {
326                 return 1;
327         }
328
329         /*
330          * the rdn attribute should be at the end!
331          * so we need to return a value less than zero
332          * which means m2 is greater than m1
333          */
334         if (m2->attid == *rdn_attid) {
335                 return -1;
336         }
337
338         return m1->attid - m2->attid;
339 }
340
341 static int replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
342                                                 const struct dsdb_schema *schema,
343                                                 struct ldb_dn *dn)
344 {
345         const char *rdn_name;
346         const struct dsdb_attribute *rdn_sa;
347
348         rdn_name = ldb_dn_get_rdn_name(dn);
349         if (!rdn_name) {
350                 DEBUG(0,(__location__ ": No rDN for %s?\n", ldb_dn_get_linearized(dn)));
351                 return LDB_ERR_OPERATIONS_ERROR;
352         }
353
354         rdn_sa = dsdb_attribute_by_lDAPDisplayName(schema, rdn_name);
355         if (rdn_sa == NULL) {
356                 DEBUG(0,(__location__ ": No sa found for rDN %s for %s\n", rdn_name, ldb_dn_get_linearized(dn)));
357                 return LDB_ERR_OPERATIONS_ERROR;                
358         }
359
360         DEBUG(6,("Sorting rpmd with attid exception %u rDN=%s DN=%s\n", 
361                  rdn_sa->attributeID_id, rdn_name, ldb_dn_get_linearized(dn)));
362
363         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
364                   discard_const_p(void, &rdn_sa->attributeID_id), 
365                   (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
366
367         return LDB_SUCCESS;
368 }
369
370 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
371                                                  const struct ldb_message_element *e2,
372                                                  const struct dsdb_schema *schema)
373 {
374         const struct dsdb_attribute *a1;
375         const struct dsdb_attribute *a2;
376
377         /* 
378          * TODO: make this faster by caching the dsdb_attribute pointer
379          *       on the ldb_messag_element
380          */
381
382         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
383         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
384
385         /*
386          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
387          *       in the schema
388          */
389         if (!a1 || !a2) {
390                 return strcasecmp(e1->name, e2->name);
391         }
392
393         return a1->attributeID_id - a2->attributeID_id;
394 }
395
396 static void replmd_ldb_message_sort(struct ldb_message *msg,
397                                     const struct dsdb_schema *schema)
398 {
399         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
400                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
401 }
402
403 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
404 {
405         struct ldb_context *ldb;
406         struct ldb_control *control;
407         struct ldb_control **saved_controls;
408         struct replmd_replicated_request *ac;
409         enum ndr_err_code ndr_err;
410         struct ldb_request *down_req;
411         struct ldb_message *msg;
412         const DATA_BLOB *guid_blob;
413         struct GUID guid;
414         struct ldb_val guid_value;
415         struct replPropertyMetaDataBlob nmd;
416         struct ldb_val nmd_value;
417         const struct GUID *our_invocation_id;
418         time_t t = time(NULL);
419         NTTIME now;
420         char *time_str;
421         int ret;
422         uint32_t i, ni=0;
423         bool allow_add_guid = false;
424         bool remove_current_guid = false;
425
426         /* check if there's a show relax control (used by provision to say 'I know what I'm doing') */
427         control = ldb_request_get_control(req, LDB_CONTROL_RELAX_OID);
428         if (control) {
429                 allow_add_guid = 1;
430         }
431
432         /* do not manipulate our control entries */
433         if (ldb_dn_is_special(req->op.add.message->dn)) {
434                 return ldb_next_request(module, req);
435         }
436
437         ldb = ldb_module_get_ctx(module);
438
439         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
440
441         ac = replmd_ctx_init(module, req);
442         if (!ac) {
443                 return LDB_ERR_OPERATIONS_ERROR;
444         }
445
446         guid_blob = ldb_msg_find_ldb_val(req->op.add.message, "objectGUID");
447         if ( guid_blob != NULL ) {
448                 if( !allow_add_guid ) {
449                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
450                               "replmd_add: it's not allowed to add an object with objectGUID\n");
451                         talloc_free(ac);
452                         return LDB_ERR_UNWILLING_TO_PERFORM;
453                 } else {
454                         NTSTATUS status = GUID_from_data_blob(guid_blob,&guid);
455                         if ( !NT_STATUS_IS_OK(status)) {
456                                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
457                                       "replmd_add: Unable to parse as a GUID the attribute objectGUID\n");
458                                 talloc_free(ac);
459                                 return LDB_ERR_UNWILLING_TO_PERFORM;
460                         }
461                         /* we remove this attribute as it can be a string and will not be treated 
462                         correctly and then we will readd it latter on in the good format*/
463                         remove_current_guid = true;
464                 }
465         } else {
466                 /* a new GUID */
467                 guid = GUID_random();
468         }
469
470         /* Get a sequence number from the backend */
471         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ac->seq_num);
472         if (ret != LDB_SUCCESS) {
473                 talloc_free(ac);
474                 return ret;
475         }
476
477         /* get our invocationId */
478         our_invocation_id = samdb_ntds_invocation_id(ldb);
479         if (!our_invocation_id) {
480                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
481                               "replmd_add: unable to find invocationId\n");
482                 talloc_free(ac);
483                 return LDB_ERR_OPERATIONS_ERROR;
484         }
485
486         /* we have to copy the message as the caller might have it as a const */
487         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
488         if (msg == NULL) {
489                 ldb_oom(ldb);
490                 talloc_free(ac);
491                 return LDB_ERR_OPERATIONS_ERROR;
492         }
493
494         /* generated times */
495         unix_to_nt_time(&now, t);
496         time_str = ldb_timestring(msg, t);
497         if (!time_str) {
498                 ldb_oom(ldb);
499                 talloc_free(ac);
500                 return LDB_ERR_OPERATIONS_ERROR;
501         }
502         if (remove_current_guid) {
503                 ldb_msg_remove_attr(msg,"objectGUID");
504         }
505
506         /* 
507          * remove autogenerated attributes
508          */
509         ldb_msg_remove_attr(msg, "whenCreated");
510         ldb_msg_remove_attr(msg, "whenChanged");
511         ldb_msg_remove_attr(msg, "uSNCreated");
512         ldb_msg_remove_attr(msg, "uSNChanged");
513         ldb_msg_remove_attr(msg, "replPropertyMetaData");
514
515         /*
516          * readd replicated attributes
517          */
518         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
519         if (ret != LDB_SUCCESS) {
520                 ldb_oom(ldb);
521                 talloc_free(ac);
522                 return ret;
523         }
524
525         /* build the replication meta_data */
526         ZERO_STRUCT(nmd);
527         nmd.version             = 1;
528         nmd.ctr.ctr1.count      = msg->num_elements;
529         nmd.ctr.ctr1.array      = talloc_array(msg,
530                                                struct replPropertyMetaData1,
531                                                nmd.ctr.ctr1.count);
532         if (!nmd.ctr.ctr1.array) {
533                 ldb_oom(ldb);
534                 talloc_free(ac);
535                 return LDB_ERR_OPERATIONS_ERROR;
536         }
537
538         for (i=0; i < msg->num_elements; i++) {
539                 struct ldb_message_element *e = &msg->elements[i];
540                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
541                 const struct dsdb_attribute *sa;
542
543                 if (e->name[0] == '@') continue;
544
545                 sa = dsdb_attribute_by_lDAPDisplayName(ac->schema, e->name);
546                 if (!sa) {
547                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
548                                       "replmd_add: attribute '%s' not defined in schema\n",
549                                       e->name);
550                         talloc_free(ac);
551                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
552                 }
553
554                 if ((sa->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (sa->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
555                         /* if the attribute is not replicated (0x00000001)
556                          * or constructed (0x00000004) it has no metadata
557                          */
558                         continue;
559                 }
560
561                 m->attid                        = sa->attributeID_id;
562                 m->version                      = 1;
563                 m->originating_change_time      = now;
564                 m->originating_invocation_id    = *our_invocation_id;
565                 m->originating_usn              = ac->seq_num;
566                 m->local_usn                    = ac->seq_num;
567                 ni++;
568         }
569
570         /* fix meta data count */
571         nmd.ctr.ctr1.count = ni;
572
573         /*
574          * sort meta data array, and move the rdn attribute entry to the end
575          */
576         ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, ac->schema, msg->dn);
577         if (ret != LDB_SUCCESS) {
578                 talloc_free(ac);
579                 return ret;
580         }
581
582         /* generated NDR encoded values */
583         ndr_err = ndr_push_struct_blob(&guid_value, msg, 
584                                        NULL,
585                                        &guid,
586                                        (ndr_push_flags_fn_t)ndr_push_GUID);
587         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
588                 ldb_oom(ldb);
589                 talloc_free(ac);
590                 return LDB_ERR_OPERATIONS_ERROR;
591         }
592         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
593                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
594                                        &nmd,
595                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
596         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
597                 ldb_oom(ldb);
598                 talloc_free(ac);
599                 return LDB_ERR_OPERATIONS_ERROR;
600         }
601
602         /*
603          * add the autogenerated values
604          */
605         ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
606         if (ret != LDB_SUCCESS) {
607                 ldb_oom(ldb);
608                 talloc_free(ac);
609                 return ret;
610         }
611         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
612         if (ret != LDB_SUCCESS) {
613                 ldb_oom(ldb);
614                 talloc_free(ac);
615                 return ret;
616         }
617         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", ac->seq_num);
618         if (ret != LDB_SUCCESS) {
619                 ldb_oom(ldb);
620                 talloc_free(ac);
621                 return ret;
622         }
623         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ac->seq_num);
624         if (ret != LDB_SUCCESS) {
625                 ldb_oom(ldb);
626                 talloc_free(ac);
627                 return ret;
628         }
629         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
630         if (ret != LDB_SUCCESS) {
631                 ldb_oom(ldb);
632                 talloc_free(ac);
633                 return ret;
634         }
635
636         /*
637          * sort the attributes by attid before storing the object
638          */
639         replmd_ldb_message_sort(msg, ac->schema);
640
641         ret = ldb_build_add_req(&down_req, ldb, ac,
642                                 msg,
643                                 req->controls,
644                                 ac, replmd_op_callback,
645                                 req);
646         if (ret != LDB_SUCCESS) {
647                 talloc_free(ac);
648                 return ret;
649         }
650
651         /* if a control is there remove if from the modified request */
652         if (control && !save_controls(control, down_req, &saved_controls)) {
653                 talloc_free(ac);
654                 return LDB_ERR_OPERATIONS_ERROR;
655         }
656
657         /* go on with the call chain */
658         return ldb_next_request(module, down_req);
659 }
660
661
662 /*
663  * update the replPropertyMetaData for one element
664  */
665 static int replmd_update_rpmd_element(struct ldb_context *ldb, 
666                                       struct ldb_message *msg,
667                                       struct ldb_message_element *el,
668                                       struct replPropertyMetaDataBlob *omd,
669                                       const struct dsdb_schema *schema,
670                                       uint64_t *seq_num,
671                                       const struct GUID *our_invocation_id,
672                                       NTTIME now)
673 {
674         int i;
675         const struct dsdb_attribute *a;
676         struct replPropertyMetaData1 *md1;
677
678         a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
679         if (a == NULL) {
680                 DEBUG(0,(__location__ ": Unable to find attribute %s in schema\n",
681                          el->name));
682                 return LDB_ERR_OPERATIONS_ERROR;
683         }
684
685         if ((a->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (a->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
686                 return LDB_SUCCESS;
687         }
688
689         for (i=0; i<omd->ctr.ctr1.count; i++) {
690                 if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) break;
691         }
692         if (i == omd->ctr.ctr1.count) {
693                 /* we need to add a new one */
694                 omd->ctr.ctr1.array = talloc_realloc(msg, omd->ctr.ctr1.array, 
695                                                      struct replPropertyMetaData1, omd->ctr.ctr1.count+1);
696                 if (omd->ctr.ctr1.array == NULL) {
697                         ldb_oom(ldb);
698                         return LDB_ERR_OPERATIONS_ERROR;
699                 }
700                 omd->ctr.ctr1.count++;
701                 ZERO_STRUCT(omd->ctr.ctr1.array[i]);
702         }
703
704         /* Get a new sequence number from the backend. We only do this
705          * if we have a change that requires a new
706          * replPropertyMetaData element 
707          */
708         if (*seq_num == 0) {
709                 int ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num);
710                 if (ret != LDB_SUCCESS) {
711                         return LDB_ERR_OPERATIONS_ERROR;
712                 }
713         }
714
715         md1 = &omd->ctr.ctr1.array[i];
716         md1->version++;
717         md1->attid                     = a->attributeID_id;
718         md1->originating_change_time   = now;
719         md1->originating_invocation_id = *our_invocation_id;
720         md1->originating_usn           = *seq_num;
721         md1->local_usn                 = *seq_num;
722         
723         return LDB_SUCCESS;
724 }
725
726 /*
727  * update the replPropertyMetaData object each time we modify an
728  * object. This is needed for DRS replication, as the merge on the
729  * client is based on this object 
730  */
731 static int replmd_update_rpmd(struct ldb_module *module, 
732                               const struct dsdb_schema *schema, 
733                               struct ldb_message *msg, uint64_t *seq_num)
734 {
735         const struct ldb_val *omd_value;
736         enum ndr_err_code ndr_err;
737         struct replPropertyMetaDataBlob omd;
738         int i;
739         time_t t = time(NULL);
740         NTTIME now;
741         const struct GUID *our_invocation_id;
742         int ret;
743         const char *attrs[] = { "replPropertyMetaData" , NULL };
744         struct ldb_result *res;
745         struct ldb_context *ldb;
746
747         ldb = ldb_module_get_ctx(module);
748
749         our_invocation_id = samdb_ntds_invocation_id(ldb);
750         if (!our_invocation_id) {
751                 /* this happens during an initial vampire while
752                    updating the schema */
753                 DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
754                 return LDB_SUCCESS;
755         }
756
757         unix_to_nt_time(&now, t);
758
759         /* search for the existing replPropertyMetaDataBlob */
760         ret = dsdb_search_dn_with_deleted(ldb, msg, &res, msg->dn, attrs);
761         if (ret != LDB_SUCCESS || res->count != 1) {
762                 DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
763                          ldb_dn_get_linearized(msg->dn)));
764                 return LDB_ERR_OPERATIONS_ERROR;
765         }
766                 
767
768         omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
769         if (!omd_value) {
770                 DEBUG(0,(__location__ ": Object %s does not have a replPropertyMetaData attribute\n",
771                          ldb_dn_get_linearized(msg->dn)));
772                 return LDB_ERR_OPERATIONS_ERROR;
773         }
774
775         ndr_err = ndr_pull_struct_blob(omd_value, msg,
776                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
777                                        (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
778         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
779                 DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
780                          ldb_dn_get_linearized(msg->dn)));
781                 return LDB_ERR_OPERATIONS_ERROR;
782         }
783
784         if (omd.version != 1) {
785                 DEBUG(0,(__location__ ": bad version %u in replPropertyMetaData for %s\n",
786                          omd.version, ldb_dn_get_linearized(msg->dn)));
787                 return LDB_ERR_OPERATIONS_ERROR;
788         }
789
790         for (i=0; i<msg->num_elements; i++) {
791                 ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], &omd, schema, seq_num, 
792                                                  our_invocation_id, now);
793                 if (ret != LDB_SUCCESS) {
794                         return ret;
795                 }
796         }
797
798         /*
799          * replmd_update_rpmd_element has done an update if the
800          * seq_num is set
801          */
802         if (*seq_num != 0) {
803                 struct ldb_val *md_value;
804                 struct ldb_message_element *el;
805
806                 md_value = talloc(msg, struct ldb_val);
807                 if (md_value == NULL) {
808                         ldb_oom(ldb);
809                         return LDB_ERR_OPERATIONS_ERROR;
810                 }
811
812                 ret = replmd_replPropertyMetaDataCtr1_sort(&omd.ctr.ctr1, schema, msg->dn);
813                 if (ret != LDB_SUCCESS) {
814                         return ret;
815                 }
816
817                 ndr_err = ndr_push_struct_blob(md_value, msg, 
818                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
819                                                &omd,
820                                                (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
821                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
822                         DEBUG(0,(__location__ ": Failed to marshall replPropertyMetaData for %s\n",
823                                  ldb_dn_get_linearized(msg->dn)));
824                         return LDB_ERR_OPERATIONS_ERROR;
825                 }
826
827                 ret = ldb_msg_add_empty(msg, "replPropertyMetaData", LDB_FLAG_MOD_REPLACE, &el);
828                 if (ret != LDB_SUCCESS) {
829                         DEBUG(0,(__location__ ": Failed to add updated replPropertyMetaData %s\n",
830                                  ldb_dn_get_linearized(msg->dn)));
831                         return ret;
832                 }
833
834                 el->num_values = 1;
835                 el->values = md_value;
836         }
837
838         return LDB_SUCCESS;     
839 }
840
841
842 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
843 {
844         struct ldb_context *ldb;
845         struct replmd_replicated_request *ac;
846         struct ldb_request *down_req;
847         struct ldb_message *msg;
848         struct ldb_result *res;
849         time_t t = time(NULL);
850         uint64_t seq_num = 0;
851         int ret;
852
853         /* do not manipulate our control entries */
854         if (ldb_dn_is_special(req->op.mod.message->dn)) {
855                 return ldb_next_request(module, req);
856         }
857
858         ldb = ldb_module_get_ctx(module);
859
860         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
861
862         ac = replmd_ctx_init(module, req);
863         if (!ac) {
864                 return LDB_ERR_OPERATIONS_ERROR;
865         }
866
867         /* we have to copy the message as the caller might have it as a const */
868         msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
869         if (msg == NULL) {
870                 ldb_oom(ldb);
871                 talloc_free(ac);
872                 return LDB_ERR_OPERATIONS_ERROR;
873         }
874
875         /* TODO:
876          * - give an error when a readonly attribute should
877          *   be modified
878          * - merge the changed into the old object
879          *   if the caller set values to the same value
880          *   ignore the attribute, return success when no
881          *   attribute was changed
882          */
883
884         ret = dsdb_search_dn_with_deleted(ldb, msg, &res, msg->dn, NULL);
885         if (ret != LDB_SUCCESS) {
886                 talloc_free(ac);
887                 return ret;
888         }
889
890         ret = replmd_update_rpmd(module, ac->schema, msg, &ac->seq_num);
891         if (ret != LDB_SUCCESS) {
892                 talloc_free(ac);
893                 return ret;
894         }
895
896         /* TODO:
897          * - replace the old object with the newly constructed one
898          */
899
900         ret = ldb_build_mod_req(&down_req, ldb, ac,
901                                 msg,
902                                 req->controls,
903                                 ac, replmd_op_callback,
904                                 req);
905         if (ret != LDB_SUCCESS) {
906                 talloc_free(ac);
907                 return ret;
908         }
909         talloc_steal(down_req, msg);
910
911         /* we only change whenChanged and uSNChanged if the seq_num
912            has changed */
913         if (seq_num != 0) {
914                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
915                         talloc_free(ac);
916                         return ret;
917                 }
918
919                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
920                         talloc_free(ac);
921                         return ret;
922                 }
923         }
924
925         /* go on with the call chain */
926         return ldb_next_request(module, down_req);
927 }
928
929 static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *ares);
930
931 /*
932   handle a rename request
933
934   On a rename we need to do an extra ldb_modify which sets the
935   whenChanged and uSNChanged attributes.  We do this in a callback after the success.
936  */
937 static int replmd_rename(struct ldb_module *module, struct ldb_request *req)
938 {
939         struct ldb_context *ldb;
940         struct replmd_replicated_request *ac;
941         int ret;
942         struct ldb_request *down_req;
943
944         /* do not manipulate our control entries */
945         if (ldb_dn_is_special(req->op.mod.message->dn)) {
946                 return ldb_next_request(module, req);
947         }
948
949         ldb = ldb_module_get_ctx(module);
950
951         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_rename\n");
952
953         ac = replmd_ctx_init(module, req);
954         if (!ac) {
955                 return LDB_ERR_OPERATIONS_ERROR;
956         }
957         ret = ldb_build_rename_req(&down_req, ldb, ac,
958                                    ac->req->op.rename.olddn,
959                                    ac->req->op.rename.newdn,
960                                    ac->req->controls,
961                                    ac, replmd_rename_callback,
962                                    ac->req);
963
964         if (ret != LDB_SUCCESS) {
965                 talloc_free(ac);
966                 return ret;
967         }
968
969         /* go on with the call chain */
970         return ldb_next_request(module, down_req);
971 }
972
973 /* After the rename is compleated, update the whenchanged etc */
974 static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *ares)
975 {
976         struct ldb_context *ldb;
977         struct replmd_replicated_request *ac;
978         struct ldb_request *down_req;
979         struct ldb_message *msg;
980         time_t t = time(NULL);
981         int ret;
982
983         ac = talloc_get_type(req->context, struct replmd_replicated_request);
984         ldb = ldb_module_get_ctx(ac->module);
985
986         if (ares->error != LDB_SUCCESS) {
987                 return ldb_module_done(ac->req, ares->controls,
988                                         ares->response, ares->error);
989         }
990
991         if (ares->type != LDB_REPLY_DONE) {
992                 ldb_set_errstring(ldb,
993                                   "invalid ldb_reply_type in callback");
994                 talloc_free(ares);
995                 return ldb_module_done(ac->req, NULL, NULL,
996                                         LDB_ERR_OPERATIONS_ERROR);
997         }
998
999         /* Get a sequence number from the backend */
1000         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ac->seq_num);
1001         if (ret != LDB_SUCCESS) {
1002                 return ret;
1003         }
1004
1005         /* TODO:
1006          * - replace the old object with the newly constructed one
1007          */
1008
1009         msg = ldb_msg_new(ac);
1010         if (msg == NULL) {
1011                 ldb_oom(ldb);
1012                 return LDB_ERR_OPERATIONS_ERROR;
1013         }
1014
1015         msg->dn = ac->req->op.rename.newdn;
1016
1017         ret = ldb_build_mod_req(&down_req, ldb, ac,
1018                                 msg,
1019                                 req->controls,
1020                                 ac, replmd_op_callback,
1021                                 req);
1022
1023         if (ret != LDB_SUCCESS) {
1024                 talloc_free(ac);
1025                 return ret;
1026         }
1027         talloc_steal(down_req, msg);
1028
1029         if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
1030                 talloc_free(ac);
1031                 return ret;
1032         }
1033         
1034         if (add_uint64_element(msg, "uSNChanged", ac->seq_num) != LDB_SUCCESS) {
1035                 talloc_free(ac);
1036                 return ret;
1037         }
1038
1039         /* go on with the call chain - do the modify after the rename */
1040         return ldb_next_request(ac->module, down_req);
1041 }
1042
1043
1044 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
1045 {
1046         return ret;
1047 }
1048
1049 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
1050 {
1051         int ret = LDB_ERR_OTHER;
1052         /* TODO: do some error mapping */
1053         return ret;
1054 }
1055
1056 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
1057 {
1058         struct ldb_context *ldb;
1059         struct ldb_request *change_req;
1060         enum ndr_err_code ndr_err;
1061         struct ldb_message *msg;
1062         struct replPropertyMetaDataBlob *md;
1063         struct ldb_val md_value;
1064         uint32_t i;
1065         int ret;
1066
1067         /*
1068          * TODO: check if the parent object exist
1069          */
1070
1071         /*
1072          * TODO: handle the conflict case where an object with the
1073          *       same name exist
1074          */
1075
1076         ldb = ldb_module_get_ctx(ar->module);
1077         msg = ar->objs->objects[ar->index_current].msg;
1078         md = ar->objs->objects[ar->index_current].meta_data;
1079
1080         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ar->seq_num);
1081         if (ret != LDB_SUCCESS) {
1082                 return replmd_replicated_request_error(ar, ret);
1083         }
1084
1085         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
1086         if (ret != LDB_SUCCESS) {
1087                 return replmd_replicated_request_error(ar, ret);
1088         }
1089
1090         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1091         if (ret != LDB_SUCCESS) {
1092                 return replmd_replicated_request_error(ar, ret);
1093         }
1094
1095         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", ar->seq_num);
1096         if (ret != LDB_SUCCESS) {
1097                 return replmd_replicated_request_error(ar, ret);
1098         }
1099
1100         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ar->seq_num);
1101         if (ret != LDB_SUCCESS) {
1102                 return replmd_replicated_request_error(ar, ret);
1103         }
1104
1105         /* remove any message elements that have zero values */
1106         for (i=0; i<msg->num_elements; i++) {
1107                 if (msg->elements[i].num_values == 0) {
1108                         DEBUG(4,(__location__ ": Removing attribute %s with num_values==0\n",
1109                                  msg->elements[i].name));
1110                         memmove(&msg->elements[i], 
1111                                 &msg->elements[i+1], 
1112                                 sizeof(msg->elements[i])*(msg->num_elements - (i+1)));
1113                         msg->num_elements--;
1114                         i--;
1115                 }
1116         }
1117         
1118         /*
1119          * the meta data array is already sorted by the caller
1120          */
1121         for (i=0; i < md->ctr.ctr1.count; i++) {
1122                 md->ctr.ctr1.array[i].local_usn = ar->seq_num;
1123         }
1124         ndr_err = ndr_push_struct_blob(&md_value, msg, 
1125                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1126                                        md,
1127                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1128         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1129                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1130                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1131         }
1132         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
1133         if (ret != LDB_SUCCESS) {
1134                 return replmd_replicated_request_error(ar, ret);
1135         }
1136
1137         replmd_ldb_message_sort(msg, ar->schema);
1138
1139         if (DEBUGLVL(4)) {
1140                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
1141                 DEBUG(4, ("DRS replication add message:\n%s\n", s));
1142                 talloc_free(s);
1143         }
1144
1145         ret = ldb_build_add_req(&change_req,
1146                                 ldb,
1147                                 ar,
1148                                 msg,
1149                                 ar->controls,
1150                                 ar,
1151                                 replmd_op_callback,
1152                                 ar->req);
1153         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1154
1155         return ldb_next_request(ar->module, change_req);
1156 }
1157
1158 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
1159                                                          struct replPropertyMetaData1 *m2)
1160 {
1161         int ret;
1162
1163         if (m1->version != m2->version) {
1164                 return m1->version - m2->version;
1165         }
1166
1167         if (m1->originating_change_time != m2->originating_change_time) {
1168                 return m1->originating_change_time - m2->originating_change_time;
1169         }
1170
1171         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
1172         if (ret != 0) {
1173                 return ret;
1174         }
1175
1176         return m1->originating_usn - m2->originating_usn;
1177 }
1178
1179 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
1180 {
1181         struct ldb_context *ldb;
1182         struct ldb_request *change_req;
1183         enum ndr_err_code ndr_err;
1184         struct ldb_message *msg;
1185         struct replPropertyMetaDataBlob *rmd;
1186         struct replPropertyMetaDataBlob omd;
1187         const struct ldb_val *omd_value;
1188         struct replPropertyMetaDataBlob nmd;
1189         struct ldb_val nmd_value;
1190         uint32_t i,j,ni=0;
1191         uint32_t removed_attrs = 0;
1192         int ret;
1193
1194         ldb = ldb_module_get_ctx(ar->module);
1195         msg = ar->objs->objects[ar->index_current].msg;
1196         rmd = ar->objs->objects[ar->index_current].meta_data;
1197         ZERO_STRUCT(omd);
1198         omd.version = 1;
1199
1200         /*
1201          * TODO: check repl data is correct after a rename
1202          */
1203         if (ldb_dn_compare(msg->dn, ar->search_msg->dn) != 0) {
1204                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_request rename %s => %s\n",
1205                           ldb_dn_get_linearized(ar->search_msg->dn),
1206                           ldb_dn_get_linearized(msg->dn));
1207                 if (ldb_rename(ldb, ar->search_msg->dn, msg->dn) != LDB_SUCCESS) {
1208                         ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_replicated_request rename %s => %s failed - %s\n",
1209                                   ldb_dn_get_linearized(ar->search_msg->dn),
1210                                   ldb_dn_get_linearized(msg->dn),
1211                                   ldb_errstring(ldb));
1212                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
1213                 }
1214         }
1215
1216         /* find existing meta data */
1217         omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
1218         if (omd_value) {
1219                 ndr_err = ndr_pull_struct_blob(omd_value, ar,
1220                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
1221                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
1222                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1223                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1224                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1225                 }
1226
1227                 if (omd.version != 1) {
1228                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1229                 }
1230         }
1231
1232         ZERO_STRUCT(nmd);
1233         nmd.version = 1;
1234         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
1235         nmd.ctr.ctr1.array = talloc_array(ar,
1236                                           struct replPropertyMetaData1,
1237                                           nmd.ctr.ctr1.count);
1238         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1239
1240         /* first copy the old meta data */
1241         for (i=0; i < omd.ctr.ctr1.count; i++) {
1242                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
1243                 ni++;
1244         }
1245
1246         /* now merge in the new meta data */
1247         for (i=0; i < rmd->ctr.ctr1.count; i++) {
1248                 bool found = false;
1249
1250                 for (j=0; j < ni; j++) {
1251                         int cmp;
1252
1253                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
1254                                 continue;
1255                         }
1256
1257                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
1258                                                                             &nmd.ctr.ctr1.array[j]);
1259                         if (cmp > 0) {
1260                                 /* replace the entry */
1261                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
1262                                 found = true;
1263                                 break;
1264                         }
1265
1266                         /* we don't want to apply this change so remove the attribute */
1267                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
1268                         removed_attrs++;
1269
1270                         found = true;
1271                         break;
1272                 }
1273
1274                 if (found) continue;
1275
1276                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
1277                 ni++;
1278         }
1279
1280         /*
1281          * finally correct the size of the meta_data array
1282          */
1283         nmd.ctr.ctr1.count = ni;
1284
1285         /*
1286          * the rdn attribute (the alias for the name attribute),
1287          * 'cn' for most objects is the last entry in the meta data array
1288          * we have stored
1289          *
1290          * sort the new meta data array
1291          */
1292         ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, ar->schema, msg->dn);
1293         if (ret != LDB_SUCCESS) {
1294                 return ret;
1295         }
1296
1297         /*
1298          * check if some replicated attributes left, otherwise skip the ldb_modify() call
1299          */
1300         if (msg->num_elements == 0) {
1301                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
1302                           ar->index_current);
1303
1304                 ar->index_current++;
1305                 return replmd_replicated_apply_next(ar);
1306         }
1307
1308         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
1309                   ar->index_current, msg->num_elements);
1310
1311         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ar->seq_num);
1312         if (ret != LDB_SUCCESS) {
1313                 return replmd_replicated_request_error(ar, ret);
1314         }
1315
1316         for (i=0; i<ni; i++) {
1317                 nmd.ctr.ctr1.array[i].local_usn = ar->seq_num;
1318         }
1319
1320         /* create the meta data value */
1321         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
1322                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1323                                        &nmd,
1324                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1325         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1326                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1327                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1328         }
1329
1330         /*
1331          * when we know that we'll modify the record, add the whenChanged, uSNChanged
1332          * and replPopertyMetaData attributes
1333          */
1334         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1335         if (ret != LDB_SUCCESS) {
1336                 return replmd_replicated_request_error(ar, ret);
1337         }
1338         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ar->seq_num);
1339         if (ret != LDB_SUCCESS) {
1340                 return replmd_replicated_request_error(ar, ret);
1341         }
1342         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
1343         if (ret != LDB_SUCCESS) {
1344                 return replmd_replicated_request_error(ar, ret);
1345         }
1346
1347         replmd_ldb_message_sort(msg, ar->schema);
1348
1349         /* we want to replace the old values */
1350         for (i=0; i < msg->num_elements; i++) {
1351                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
1352         }
1353
1354         if (DEBUGLVL(4)) {
1355                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1356                 DEBUG(4, ("DRS replication modify message:\n%s\n", s));
1357                 talloc_free(s);
1358         }
1359
1360         ret = ldb_build_mod_req(&change_req,
1361                                 ldb,
1362                                 ar,
1363                                 msg,
1364                                 ar->controls,
1365                                 ar,
1366                                 replmd_op_callback,
1367                                 ar->req);
1368         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1369
1370         return ldb_next_request(ar->module, change_req);
1371 }
1372
1373 static int replmd_replicated_apply_search_callback(struct ldb_request *req,
1374                                                    struct ldb_reply *ares)
1375 {
1376         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1377                                                struct replmd_replicated_request);
1378         int ret;
1379
1380         if (!ares) {
1381                 return ldb_module_done(ar->req, NULL, NULL,
1382                                         LDB_ERR_OPERATIONS_ERROR);
1383         }
1384         if (ares->error != LDB_SUCCESS &&
1385             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1386                 return ldb_module_done(ar->req, ares->controls,
1387                                         ares->response, ares->error);
1388         }
1389
1390         switch (ares->type) {
1391         case LDB_REPLY_ENTRY:
1392                 ar->search_msg = talloc_steal(ar, ares->message);
1393                 break;
1394
1395         case LDB_REPLY_REFERRAL:
1396                 /* we ignore referrals */
1397                 break;
1398
1399         case LDB_REPLY_DONE:
1400                 if (ar->search_msg != NULL) {
1401                         ret = replmd_replicated_apply_merge(ar);
1402                 } else {
1403                         ret = replmd_replicated_apply_add(ar);
1404                 }
1405                 if (ret != LDB_SUCCESS) {
1406                         return ldb_module_done(ar->req, NULL, NULL, ret);
1407                 }
1408         }
1409
1410         talloc_free(ares);
1411         return LDB_SUCCESS;
1412 }
1413
1414 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
1415
1416 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
1417 {
1418         struct ldb_context *ldb;
1419         int ret;
1420         char *tmp_str;
1421         char *filter;
1422         struct ldb_request *search_req;
1423         struct ldb_search_options_control *options;
1424
1425         if (ar->index_current >= ar->objs->num_objects) {
1426                 /* done with it, go to next stage */
1427                 return replmd_replicated_uptodate_vector(ar);
1428         }
1429
1430         ldb = ldb_module_get_ctx(ar->module);
1431         ar->search_msg = NULL;
1432
1433         tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].guid_value);
1434         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1435
1436         filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
1437         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1438         talloc_free(tmp_str);
1439
1440         ret = ldb_build_search_req(&search_req,
1441                                    ldb,
1442                                    ar,
1443                                    NULL,
1444                                    LDB_SCOPE_SUBTREE,
1445                                    filter,
1446                                    NULL,
1447                                    NULL,
1448                                    ar,
1449                                    replmd_replicated_apply_search_callback,
1450                                    ar->req);
1451
1452         ret = ldb_request_add_control(search_req, LDB_CONTROL_SHOW_DELETED_OID, true, NULL);
1453         if (ret != LDB_SUCCESS) {
1454                 return ret;
1455         }
1456
1457         /* we need to cope with cross-partition links, so search for
1458            the GUID over all partitions */
1459         options = talloc(search_req, struct ldb_search_options_control);
1460         if (options == NULL) {
1461                 DEBUG(0, (__location__ ": out of memory\n"));
1462                 return LDB_ERR_OPERATIONS_ERROR;
1463         }
1464         options->search_options = LDB_SEARCH_OPTION_PHANTOM_ROOT;
1465
1466         ret = ldb_request_add_control(search_req,
1467                                       LDB_CONTROL_SEARCH_OPTIONS_OID,
1468                                       true, options);
1469         if (ret != LDB_SUCCESS) {
1470                 return ret;
1471         }
1472
1473         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1474
1475         return ldb_next_request(ar->module, search_req);
1476 }
1477
1478 static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
1479                                                       struct ldb_reply *ares)
1480 {
1481         struct ldb_context *ldb;
1482         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1483                                                struct replmd_replicated_request);
1484         ldb = ldb_module_get_ctx(ar->module);
1485
1486         if (!ares) {
1487                 return ldb_module_done(ar->req, NULL, NULL,
1488                                         LDB_ERR_OPERATIONS_ERROR);
1489         }
1490         if (ares->error != LDB_SUCCESS) {
1491                 return ldb_module_done(ar->req, ares->controls,
1492                                         ares->response, ares->error);
1493         }
1494
1495         if (ares->type != LDB_REPLY_DONE) {
1496                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1497                 return ldb_module_done(ar->req, NULL, NULL,
1498                                         LDB_ERR_OPERATIONS_ERROR);
1499         }
1500
1501         talloc_free(ares);
1502
1503         return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
1504 }
1505
1506 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1507 {
1508         struct ldb_context *ldb;
1509         struct ldb_request *change_req;
1510         enum ndr_err_code ndr_err;
1511         struct ldb_message *msg;
1512         struct replUpToDateVectorBlob ouv;
1513         const struct ldb_val *ouv_value;
1514         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1515         struct replUpToDateVectorBlob nuv;
1516         struct ldb_val nuv_value;
1517         struct ldb_message_element *nuv_el = NULL;
1518         const struct GUID *our_invocation_id;
1519         struct ldb_message_element *orf_el = NULL;
1520         struct repsFromToBlob nrf;
1521         struct ldb_val *nrf_value = NULL;
1522         struct ldb_message_element *nrf_el = NULL;
1523         uint32_t i,j,ni=0;
1524         bool found = false;
1525         time_t t = time(NULL);
1526         NTTIME now;
1527         int ret;
1528
1529         ldb = ldb_module_get_ctx(ar->module);
1530         ruv = ar->objs->uptodateness_vector;
1531         ZERO_STRUCT(ouv);
1532         ouv.version = 2;
1533         ZERO_STRUCT(nuv);
1534         nuv.version = 2;
1535
1536         unix_to_nt_time(&now, t);
1537
1538         /*
1539          * first create the new replUpToDateVector
1540          */
1541         ouv_value = ldb_msg_find_ldb_val(ar->search_msg, "replUpToDateVector");
1542         if (ouv_value) {
1543                 ndr_err = ndr_pull_struct_blob(ouv_value, ar,
1544                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &ouv,
1545                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1546                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1547                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1548                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1549                 }
1550
1551                 if (ouv.version != 2) {
1552                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1553                 }
1554         }
1555
1556         /*
1557          * the new uptodateness vector will at least
1558          * contain 1 entry, one for the source_dsa
1559          *
1560          * plus optional values from our old vector and the one from the source_dsa
1561          */
1562         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
1563         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1564         nuv.ctr.ctr2.cursors = talloc_array(ar,
1565                                             struct drsuapi_DsReplicaCursor2,
1566                                             nuv.ctr.ctr2.count);
1567         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1568
1569         /* first copy the old vector */
1570         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1571                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1572                 ni++;
1573         }
1574
1575         /* get our invocation_id if we have one already attached to the ldb */
1576         our_invocation_id = samdb_ntds_invocation_id(ldb);
1577
1578         /* merge in the source_dsa vector is available */
1579         for (i=0; (ruv && i < ruv->count); i++) {
1580                 found = false;
1581
1582                 if (our_invocation_id &&
1583                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1584                                our_invocation_id)) {
1585                         continue;
1586                 }
1587
1588                 for (j=0; j < ni; j++) {
1589                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1590                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1591                                 continue;
1592                         }
1593
1594                         found = true;
1595
1596                         /*
1597                          * we update only the highest_usn and not the latest_sync_success time,
1598                          * because the last success stands for direct replication
1599                          */
1600                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1601                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1602                         }
1603                         break;                  
1604                 }
1605
1606                 if (found) continue;
1607
1608                 /* if it's not there yet, add it */
1609                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1610                 ni++;
1611         }
1612
1613         /*
1614          * merge in the current highwatermark for the source_dsa
1615          */
1616         found = false;
1617         for (j=0; j < ni; j++) {
1618                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1619                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1620                         continue;
1621                 }
1622
1623                 found = true;
1624
1625                 /*
1626                  * here we update the highest_usn and last_sync_success time
1627                  * because we're directly replicating from the source_dsa
1628                  *
1629                  * and use the tmp_highest_usn because this is what we have just applied
1630                  * to our ldb
1631                  */
1632                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1633                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1634                 break;
1635         }
1636         if (!found) {
1637                 /*
1638                  * here we update the highest_usn and last_sync_success time
1639                  * because we're directly replicating from the source_dsa
1640                  *
1641                  * and use the tmp_highest_usn because this is what we have just applied
1642                  * to our ldb
1643                  */
1644                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1645                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1646                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1647                 ni++;
1648         }
1649
1650         /*
1651          * finally correct the size of the cursors array
1652          */
1653         nuv.ctr.ctr2.count = ni;
1654
1655         /*
1656          * sort the cursors
1657          */
1658         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1659               sizeof(struct drsuapi_DsReplicaCursor2),
1660               (comparison_fn_t)drsuapi_DsReplicaCursor2_compare);
1661
1662         /*
1663          * create the change ldb_message
1664          */
1665         msg = ldb_msg_new(ar);
1666         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1667         msg->dn = ar->search_msg->dn;
1668
1669         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
1670                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1671                                        &nuv,
1672                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1673         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1674                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1675                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1676         }
1677         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1678         if (ret != LDB_SUCCESS) {
1679                 return replmd_replicated_request_error(ar, ret);
1680         }
1681         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1682
1683         /*
1684          * now create the new repsFrom value from the given repsFromTo1 structure
1685          */
1686         ZERO_STRUCT(nrf);
1687         nrf.version                                     = 1;
1688         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1689         /* and fix some values... */
1690         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1691         nrf.ctr.ctr1.last_success                       = now;
1692         nrf.ctr.ctr1.last_attempt                       = now;
1693         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1694         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1695
1696         /*
1697          * first see if we already have a repsFrom value for the current source dsa
1698          * if so we'll later replace this value
1699          */
1700         orf_el = ldb_msg_find_element(ar->search_msg, "repsFrom");
1701         if (orf_el) {
1702                 for (i=0; i < orf_el->num_values; i++) {
1703                         struct repsFromToBlob *trf;
1704
1705                         trf = talloc(ar, struct repsFromToBlob);
1706                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1707
1708                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), trf,
1709                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1710                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1711                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1712                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1713                         }
1714
1715                         if (trf->version != 1) {
1716                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1717                         }
1718
1719                         /*
1720                          * we compare the source dsa objectGUID not the invocation_id
1721                          * because we want only one repsFrom value per source dsa
1722                          * and when the invocation_id of the source dsa has changed we don't need 
1723                          * the old repsFrom with the old invocation_id
1724                          */
1725                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1726                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1727                                 talloc_free(trf);
1728                                 continue;
1729                         }
1730
1731                         talloc_free(trf);
1732                         nrf_value = &orf_el->values[i];
1733                         break;
1734                 }
1735
1736                 /*
1737                  * copy over all old values to the new ldb_message
1738                  */
1739                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1740                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1741                 *nrf_el = *orf_el;
1742         }
1743
1744         /*
1745          * if we haven't found an old repsFrom value for the current source dsa
1746          * we'll add a new value
1747          */
1748         if (!nrf_value) {
1749                 struct ldb_val zero_value;
1750                 ZERO_STRUCT(zero_value);
1751                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1752                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1753
1754                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1755         }
1756
1757         /* we now fill the value which is already attached to ldb_message */
1758         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
1759                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1760                                        &nrf,
1761                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1762         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1763                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1764                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1765         }
1766
1767         /* 
1768          * the ldb_message_element for the attribute, has all the old values and the new one
1769          * so we'll replace the whole attribute with all values
1770          */
1771         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1772
1773         if (DEBUGLVL(4)) {
1774                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1775                 DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
1776                 talloc_free(s);
1777         }
1778
1779         /* prepare the ldb_modify() request */
1780         ret = ldb_build_mod_req(&change_req,
1781                                 ldb,
1782                                 ar,
1783                                 msg,
1784                                 ar->controls,
1785                                 ar,
1786                                 replmd_replicated_uptodate_modify_callback,
1787                                 ar->req);
1788         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1789
1790         return ldb_next_request(ar->module, change_req);
1791 }
1792
1793 static int replmd_replicated_uptodate_search_callback(struct ldb_request *req,
1794                                                       struct ldb_reply *ares)
1795 {
1796         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1797                                                struct replmd_replicated_request);
1798         int ret;
1799
1800         if (!ares) {
1801                 return ldb_module_done(ar->req, NULL, NULL,
1802                                         LDB_ERR_OPERATIONS_ERROR);
1803         }
1804         if (ares->error != LDB_SUCCESS &&
1805             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1806                 return ldb_module_done(ar->req, ares->controls,
1807                                         ares->response, ares->error);
1808         }
1809
1810         switch (ares->type) {
1811         case LDB_REPLY_ENTRY:
1812                 ar->search_msg = talloc_steal(ar, ares->message);
1813                 break;
1814
1815         case LDB_REPLY_REFERRAL:
1816                 /* we ignore referrals */
1817                 break;
1818
1819         case LDB_REPLY_DONE:
1820                 if (ar->search_msg == NULL) {
1821                         ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1822                 } else {
1823                         ret = replmd_replicated_uptodate_modify(ar);
1824                 }
1825                 if (ret != LDB_SUCCESS) {
1826                         return ldb_module_done(ar->req, NULL, NULL, ret);
1827                 }
1828         }
1829
1830         talloc_free(ares);
1831         return LDB_SUCCESS;
1832 }
1833
1834
1835 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1836 {
1837         struct ldb_context *ldb;
1838         int ret;
1839         static const char *attrs[] = {
1840                 "replUpToDateVector",
1841                 "repsFrom",
1842                 NULL
1843         };
1844         struct ldb_request *search_req;
1845
1846         ldb = ldb_module_get_ctx(ar->module);
1847         ar->search_msg = NULL;
1848
1849         ret = ldb_build_search_req(&search_req,
1850                                    ldb,
1851                                    ar,
1852                                    ar->objs->partition_dn,
1853                                    LDB_SCOPE_BASE,
1854                                    "(objectClass=*)",
1855                                    attrs,
1856                                    NULL,
1857                                    ar,
1858                                    replmd_replicated_uptodate_search_callback,
1859                                    ar->req);
1860         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1861
1862         return ldb_next_request(ar->module, search_req);
1863 }
1864
1865
1866
1867 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1868 {
1869         struct ldb_context *ldb;
1870         struct dsdb_extended_replicated_objects *objs;
1871         struct replmd_replicated_request *ar;
1872         struct ldb_control **ctrls;
1873         int ret, i;
1874         struct replmd_private *replmd_private = 
1875                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1876
1877         ldb = ldb_module_get_ctx(module);
1878
1879         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1880
1881         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1882         if (!objs) {
1883                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1884                 return LDB_ERR_PROTOCOL_ERROR;
1885         }
1886
1887         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1888                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1889                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1890                 return LDB_ERR_PROTOCOL_ERROR;
1891         }
1892
1893         ar = replmd_ctx_init(module, req);
1894         if (!ar)
1895                 return LDB_ERR_OPERATIONS_ERROR;
1896
1897         /* Set the flags to have the replmd_op_callback run over the full set of objects */
1898         ar->apply_mode = true;
1899         ar->objs = objs;
1900         ar->schema = dsdb_get_schema(ldb);
1901         if (!ar->schema) {
1902                 ldb_debug_set(ldb, LDB_DEBUG_FATAL, "replmd_ctx_init: no loaded schema found\n");
1903                 talloc_free(ar);
1904                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
1905                 return LDB_ERR_CONSTRAINT_VIOLATION;
1906         }
1907
1908         ctrls = req->controls;
1909
1910         if (req->controls) {
1911                 req->controls = talloc_memdup(ar, req->controls,
1912                                               talloc_get_size(req->controls));
1913                 if (!req->controls) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1914         }
1915
1916         ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, NULL);
1917         if (ret != LDB_SUCCESS) {
1918                 return ret;
1919         }
1920
1921         ar->controls = req->controls;
1922         req->controls = ctrls;
1923
1924         DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
1925
1926         /* save away the linked attributes for the end of the
1927            transaction */
1928         for (i=0; i<ar->objs->linked_attributes_count; i++) {
1929                 struct la_entry *la_entry;
1930
1931                 if (replmd_private->la_ctx == NULL) {
1932                         replmd_private->la_ctx = talloc_new(replmd_private);
1933                 }
1934                 la_entry = talloc(replmd_private->la_ctx, struct la_entry);
1935                 if (la_entry == NULL) {
1936                         ldb_oom(ldb);
1937                         return LDB_ERR_OPERATIONS_ERROR;
1938                 }
1939                 la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
1940                 if (la_entry->la == NULL) {
1941                         talloc_free(la_entry);
1942                         ldb_oom(ldb);
1943                         return LDB_ERR_OPERATIONS_ERROR;
1944                 }
1945                 *la_entry->la = ar->objs->linked_attributes[i];
1946
1947                 /* we need to steal the non-scalars so they stay
1948                    around until the end of the transaction */
1949                 talloc_steal(la_entry->la, la_entry->la->identifier);
1950                 talloc_steal(la_entry->la, la_entry->la->value.blob);
1951
1952                 DLIST_ADD(replmd_private->la_list, la_entry);
1953         }
1954
1955         return replmd_replicated_apply_next(ar);
1956 }
1957
1958 /*
1959   process one linked attribute structure
1960  */
1961 static int replmd_process_linked_attribute(struct ldb_module *module,
1962                                            struct la_entry *la_entry)
1963 {                                          
1964         struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
1965         struct ldb_context *ldb = ldb_module_get_ctx(module);
1966         struct dsdb_schema *schema = dsdb_get_schema(ldb);
1967         struct drsuapi_DsReplicaObjectIdentifier3 target;
1968         struct ldb_message *msg;
1969         struct ldb_message_element *ret_el;
1970         TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
1971         enum ndr_err_code ndr_err;
1972         struct ldb_request *mod_req;
1973         int ret;
1974         const struct dsdb_attribute *attr;
1975         struct ldb_dn *target_dn;
1976         uint64_t seq_num = 0;
1977
1978 /*
1979 linked_attributes[0]:                                                     
1980      &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute 
1981         identifier               : *                                      
1982             identifier: struct drsuapi_DsReplicaObjectIdentifier          
1983                 __ndr_size               : 0x0000003a (58)                
1984                 __ndr_size_sid           : 0x00000000 (0)                 
1985                 guid                     : 8e95b6a9-13dd-4158-89db-3220a5be5cc7
1986                 sid                      : S-0-0                               
1987                 __ndr_size_dn            : 0x00000000 (0)                      
1988                 dn                       : ''                                  
1989         attid                    : DRSUAPI_ATTRIBUTE_member (0x1F)             
1990         value: struct drsuapi_DsAttributeValue                                 
1991             __ndr_size               : 0x0000007e (126)                        
1992             blob                     : *                                       
1993                 blob                     : DATA_BLOB length=126                
1994         flags                    : 0x00000001 (1)                              
1995                1: DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE                      
1996         originating_add_time     : Wed Sep  2 22:20:01 2009 EST                
1997         meta_data: struct drsuapi_DsReplicaMetaData                            
1998             version                  : 0x00000015 (21)                         
1999             originating_change_time  : Wed Sep  2 23:39:07 2009 EST            
2000             originating_invocation_id: 794640f3-18cf-40ee-a211-a93992b67a64    
2001             originating_usn          : 0x000000000001e19c (123292)             
2002      &target: struct drsuapi_DsReplicaObjectIdentifier3                        
2003         __ndr_size               : 0x0000007e (126)                            
2004         __ndr_size_sid           : 0x0000001c (28)                             
2005         guid                     : 7639e594-db75-4086-b0d4-67890ae46031        
2006         sid                      : S-1-5-21-2848215498-2472035911-1947525656-19924
2007         __ndr_size_dn            : 0x00000022 (34)                                
2008         dn                       : 'CN=UOne,OU=TestOU,DC=vsofs8,DC=com'           
2009  */
2010         if (DEBUGLVL(4)) {
2011                 NDR_PRINT_DEBUG(drsuapi_DsReplicaLinkedAttribute, la); 
2012         }
2013         
2014         /* decode the target of the link */
2015         ndr_err = ndr_pull_struct_blob(la->value.blob, 
2016                                        tmp_ctx, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
2017                                        &target,
2018                                        (ndr_pull_flags_fn_t)ndr_pull_drsuapi_DsReplicaObjectIdentifier3);
2019         if (ndr_err != NDR_ERR_SUCCESS) {
2020                 DEBUG(0,("Unable to decode linked_attribute target\n"));
2021                 dump_data(4, la->value.blob->data, la->value.blob->length);                     
2022                 talloc_free(tmp_ctx);
2023                 return LDB_ERR_OPERATIONS_ERROR;
2024         }
2025         if (DEBUGLVL(4)) {
2026                 NDR_PRINT_DEBUG(drsuapi_DsReplicaObjectIdentifier3, &target);
2027         }
2028
2029         /* construct a modify request for this attribute change */
2030         msg = ldb_msg_new(tmp_ctx);
2031         if (!msg) {
2032                 ldb_oom(ldb);
2033                 talloc_free(tmp_ctx);
2034                 return LDB_ERR_OPERATIONS_ERROR;
2035         }
2036
2037         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, 
2038                                    GUID_string(tmp_ctx, &la->identifier->guid), &msg->dn);
2039         if (ret != LDB_SUCCESS) {
2040                 talloc_free(tmp_ctx);
2041                 return ret;
2042         }
2043
2044         /* find the attribute being modified */
2045         attr = dsdb_attribute_by_attributeID_id(schema, la->attid);
2046         if (attr == NULL) {
2047                 DEBUG(0, (__location__ ": Unable to find attributeID 0x%x\n", la->attid));
2048                 talloc_free(tmp_ctx);
2049                 return LDB_ERR_OPERATIONS_ERROR;
2050         }
2051
2052         if (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) {
2053                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
2054                                         LDB_FLAG_MOD_ADD, &ret_el);
2055         } else {
2056                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
2057                                         LDB_FLAG_MOD_DELETE, &ret_el);
2058         }
2059         if (ret != LDB_SUCCESS) {
2060                 talloc_free(tmp_ctx);
2061                 return ret;
2062         }
2063         /* we allocate two entries here, in case we need a remove/add
2064            pair */
2065         ret_el->values = talloc_array(msg, struct ldb_val, 2);
2066         if (!ret_el->values) {
2067                 ldb_oom(ldb);
2068                 talloc_free(tmp_ctx);
2069                 return LDB_ERR_OPERATIONS_ERROR;
2070         }
2071         ret_el->num_values = 1;
2072
2073         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, GUID_string(tmp_ctx, &target.guid), &target_dn);
2074         if (ret != LDB_SUCCESS) {
2075                 DEBUG(0,(__location__ ": Failed to map GUID %s to DN\n", GUID_string(tmp_ctx, &target.guid)));
2076                 talloc_free(tmp_ctx);
2077                 return LDB_ERR_OPERATIONS_ERROR;
2078         }
2079
2080         ret_el->values[0].data = (uint8_t *)ldb_dn_get_extended_linearized(tmp_ctx, target_dn, 1);
2081         ret_el->values[0].length = strlen((char *)ret_el->values[0].data);
2082
2083         ret = replmd_update_rpmd(module, schema, msg, &seq_num);
2084         if (ret != LDB_SUCCESS) {
2085                 talloc_free(tmp_ctx);
2086                 return ret;
2087         }
2088
2089         /* we only change whenChanged and uSNChanged if the seq_num
2090            has changed */
2091         if (seq_num != 0) {
2092                 time_t t = time(NULL);
2093
2094                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
2095                         talloc_free(tmp_ctx);
2096                         return LDB_ERR_OPERATIONS_ERROR;
2097                 }
2098
2099                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
2100                         talloc_free(tmp_ctx);
2101                         return LDB_ERR_OPERATIONS_ERROR;
2102                 }
2103         }
2104
2105         ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2106                                 msg,
2107                                 NULL,
2108                                 NULL, 
2109                                 ldb_op_default_callback,
2110                                 NULL);
2111         if (ret != LDB_SUCCESS) {
2112                 talloc_free(tmp_ctx);
2113                 return ret;
2114         }
2115         talloc_steal(mod_req, msg);
2116
2117         if (DEBUGLVL(4)) {
2118                 DEBUG(4,("Applying DRS linked attribute change:\n%s\n",
2119                          ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg)));
2120         }
2121
2122         /* Run the new request */
2123         ret = ldb_next_request(module, mod_req);
2124
2125         /* we need to wait for this to finish, as we are being called
2126            from the synchronous end_transaction hook of this module */
2127         if (ret == LDB_SUCCESS) {
2128                 ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
2129         }
2130
2131         if (ret == LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
2132                 /* the link destination exists, we need to update it
2133                  * by deleting the old one for the same DN then adding
2134                  * the new one */
2135                 msg->elements = talloc_realloc(msg, msg->elements,
2136                                                struct ldb_message_element,
2137                                                msg->num_elements+1);
2138                 if (msg->elements == NULL) {
2139                         ldb_oom(ldb);
2140                         talloc_free(tmp_ctx);
2141                         return LDB_ERR_OPERATIONS_ERROR;
2142                 }
2143                 /* this relies on the backend matching the old entry
2144                    only by the DN portion of the extended DN */
2145                 msg->elements[1] = msg->elements[0];
2146                 msg->elements[0].flags = LDB_FLAG_MOD_DELETE;
2147                 msg->num_elements++;
2148
2149                 ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2150                                         msg,
2151                                         NULL,
2152                                         NULL, 
2153                                         ldb_op_default_callback,
2154                                         NULL);
2155                 if (ret != LDB_SUCCESS) {
2156                         talloc_free(tmp_ctx);
2157                         return ret;
2158                 }
2159
2160                 /* Run the new request */
2161                 ret = ldb_next_request(module, mod_req);
2162                 
2163                 if (ret == LDB_SUCCESS) {
2164                         ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
2165                 }
2166         }
2167
2168         if (ret != LDB_SUCCESS) {
2169                 ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s' %s\n",
2170                           ldb_errstring(ldb),
2171                           ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
2172                 ret = LDB_SUCCESS;
2173         }
2174         
2175         talloc_free(tmp_ctx);
2176
2177         return ret;     
2178 }
2179
2180 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
2181 {
2182         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
2183                 return replmd_extended_replicated_objects(module, req);
2184         }
2185
2186         return ldb_next_request(module, req);
2187 }
2188
2189
2190 /*
2191   we hook into the transaction operations to allow us to 
2192   perform the linked attribute updates at the end of the whole
2193   transaction. This allows a forward linked attribute to be created
2194   before the object is created. During a vampire, w2k8 sends us linked
2195   attributes before the objects they are part of.
2196  */
2197 static int replmd_start_transaction(struct ldb_module *module)
2198 {
2199         /* create our private structure for this transaction */
2200         int i;
2201         struct replmd_private *replmd_private = talloc_get_type(ldb_module_get_private(module),
2202                                                                 struct replmd_private);
2203         talloc_free(replmd_private->la_ctx);
2204         replmd_private->la_list = NULL;
2205         replmd_private->la_ctx = NULL;
2206
2207         for (i=0; i<replmd_private->num_ncs; i++) {
2208                 replmd_private->ncs[i].mod_usn = 0;
2209         }
2210
2211         return ldb_next_start_trans(module);
2212 }
2213
2214 /*
2215   on prepare commit we loop over our queued la_context structures and
2216   apply each of them  
2217  */
2218 static int replmd_prepare_commit(struct ldb_module *module)
2219 {
2220         struct replmd_private *replmd_private = 
2221                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2222         struct la_entry *la, *prev;
2223         int ret;
2224
2225         /* walk the list backwards, to do the first entry first, as we
2226          * added the entries with DLIST_ADD() which puts them at the
2227          * start of the list */
2228         for (la = replmd_private->la_list; la && la->next; la=la->next) ;
2229
2230         for (; la; la=prev) {
2231                 prev = la->prev;
2232                 DLIST_REMOVE(replmd_private->la_list, la);
2233                 ret = replmd_process_linked_attribute(module, la);
2234                 if (ret != LDB_SUCCESS) {
2235                         talloc_free(replmd_private->la_ctx);
2236                         replmd_private->la_list = NULL;
2237                         replmd_private->la_ctx = NULL;
2238                         return ret;
2239                 }
2240         }
2241
2242         talloc_free(replmd_private->la_ctx);
2243         replmd_private->la_list = NULL;
2244         replmd_private->la_ctx = NULL;
2245
2246         /* possibly change @REPLCHANGED */
2247         ret = replmd_notify_store(module);
2248         if (ret != LDB_SUCCESS) {
2249                 return ret;
2250         }
2251         
2252         return ldb_next_prepare_commit(module);
2253 }
2254
2255 static int replmd_del_transaction(struct ldb_module *module)
2256 {
2257         struct replmd_private *replmd_private = 
2258                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2259         talloc_free(replmd_private->la_ctx);
2260         replmd_private->la_list = NULL;
2261         replmd_private->la_ctx = NULL;
2262         return ldb_next_del_trans(module);
2263 }
2264
2265
2266 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
2267         .name          = "repl_meta_data",
2268         .init_context      = replmd_init,
2269         .add               = replmd_add,
2270         .modify            = replmd_modify,
2271         .rename            = replmd_rename,
2272         .extended          = replmd_extended,
2273         .start_transaction = replmd_start_transaction,
2274         .prepare_commit    = replmd_prepare_commit,
2275         .del_transaction   = replmd_del_transaction,
2276 };