s4-dsdb: split RMD_USN into RMD_LOCAL_USN and RMD_ORIGINATING_USN
[amitay/samba.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /*
24  *  Name: ldb
25  *
26  *  Component: ldb repl_meta_data module
27  *
28  *  Description: - add a unique objectGUID onto every new record,
29  *               - handle whenCreated, whenChanged timestamps
30  *               - handle uSNCreated, uSNChanged numbers
31  *               - handle replPropertyMetaData attribute
32  *
33  *  Author: Simo Sorce
34  *  Author: Stefan Metzmacher
35  */
36
37 #include "includes.h"
38 #include "ldb_module.h"
39 #include "dsdb/samdb/samdb.h"
40 #include "dsdb/common/proto.h"
41 #include "../libds/common/flags.h"
42 #include "librpc/gen_ndr/ndr_misc.h"
43 #include "librpc/gen_ndr/ndr_drsuapi.h"
44 #include "librpc/gen_ndr/ndr_drsblobs.h"
45 #include "param/param.h"
46 #include "libcli/security/dom_sid.h"
47 #include "lib/util/dlinklist.h"
48 #include "dsdb/samdb/ldb_modules/util.h"
49 #include "lib/util/binsearch.h"
50
51 #define W2K3_LINKED_ATTRIBUTES 1
52
53 struct replmd_private {
54         TALLOC_CTX *la_ctx;
55         struct la_entry *la_list;
56         TALLOC_CTX *bl_ctx;
57         struct la_backlink *la_backlinks;
58         struct nc_entry {
59                 struct nc_entry *prev, *next;
60                 struct ldb_dn *dn;
61                 uint64_t mod_usn;
62         } *ncs;
63 };
64
65 struct la_entry {
66         struct la_entry *next, *prev;
67         struct drsuapi_DsReplicaLinkedAttribute *la;
68 };
69
70 struct replmd_replicated_request {
71         struct ldb_module *module;
72         struct ldb_request *req;
73
74         const struct dsdb_schema *schema;
75
76         /* the controls we pass down */
77         struct ldb_control **controls;
78
79         /* details for the mode where we apply a bunch of inbound replication meessages */
80         bool apply_mode;
81         uint32_t index_current;
82         struct dsdb_extended_replicated_objects *objs;
83
84         struct ldb_message *search_msg;
85
86         uint64_t seq_num;
87
88 };
89
90 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
91
92
93 /*
94   initialise the module
95   allocate the private structure and build the list
96   of partition DNs for use by replmd_notify()
97  */
98 static int replmd_init(struct ldb_module *module)
99 {
100         struct replmd_private *replmd_private;
101         struct ldb_context *ldb = ldb_module_get_ctx(module);
102
103         replmd_private = talloc_zero(module, struct replmd_private);
104         if (replmd_private == NULL) {
105                 ldb_oom(ldb);
106                 return LDB_ERR_OPERATIONS_ERROR;
107         }
108         ldb_module_set_private(module, replmd_private);
109
110         return ldb_next_init(module);
111 }
112
113 /*
114   cleanup our per-transaction contexts
115  */
116 static void replmd_txn_cleanup(struct replmd_private *replmd_private)
117 {
118         talloc_free(replmd_private->la_ctx);
119         replmd_private->la_list = NULL;
120         replmd_private->la_ctx = NULL;
121
122         talloc_free(replmd_private->bl_ctx);
123         replmd_private->la_backlinks = NULL;
124         replmd_private->bl_ctx = NULL;
125 }
126
127
128 struct la_backlink {
129         struct la_backlink *next, *prev;
130         const char *attr_name;
131         struct GUID forward_guid, target_guid;
132         bool active;
133 };
134
135 /*
136   process a backlinks we accumulated during a transaction, adding and
137   deleting the backlinks from the target objects
138  */
139 static int replmd_process_backlink(struct ldb_module *module, struct la_backlink *bl)
140 {
141         struct ldb_dn *target_dn, *source_dn;
142         int ret;
143         struct ldb_context *ldb = ldb_module_get_ctx(module);
144         struct ldb_message *msg;
145         TALLOC_CTX *tmp_ctx = talloc_new(bl);
146         char *dn_string;
147
148         /*
149           - find DN of target
150           - find DN of source
151           - construct ldb_message
152               - either an add or a delete
153          */
154         ret = dsdb_module_dn_by_guid(module, tmp_ctx, &bl->target_guid, &target_dn);
155         if (ret != LDB_SUCCESS) {
156                 ldb_asprintf_errstring(ldb, "Failed to find target DN for linked attribute with GUID %s\n",
157                                        GUID_string(bl, &bl->target_guid));
158                 talloc_free(tmp_ctx);
159                 return ret;
160         }
161
162         ret = dsdb_module_dn_by_guid(module, tmp_ctx, &bl->forward_guid, &source_dn);
163         if (ret != LDB_SUCCESS) {
164                 ldb_asprintf_errstring(ldb, "Failed to find source DN for linked attribute with GUID %s\n",
165                                        GUID_string(bl, &bl->forward_guid));
166                 talloc_free(tmp_ctx);
167                 return ret;
168         }
169
170         msg = ldb_msg_new(tmp_ctx);
171         if (msg == NULL) {
172                 ldb_module_oom(module);
173                 talloc_free(tmp_ctx);
174                 return LDB_ERR_OPERATIONS_ERROR;
175         }
176
177         /* construct a ldb_message for adding/deleting the backlink */
178         msg->dn = target_dn;
179         dn_string = ldb_dn_get_extended_linearized(tmp_ctx, source_dn, 1);
180         if (!dn_string) {
181                 ldb_module_oom(module);
182                 talloc_free(tmp_ctx);
183                 return LDB_ERR_OPERATIONS_ERROR;
184         }
185         ret = ldb_msg_add_steal_string(msg, bl->attr_name, dn_string);
186         if (ret != LDB_SUCCESS) {
187                 talloc_free(tmp_ctx);
188                 return ret;
189         }
190         msg->elements[0].flags = bl->active?LDB_FLAG_MOD_ADD:LDB_FLAG_MOD_DELETE;
191
192         ret = dsdb_module_modify(module, msg, 0);
193         if (ret != LDB_SUCCESS) {
194                 ldb_asprintf_errstring(ldb, "Failed to %s backlink from %s to %s - %s",
195                                        bl->active?"add":"remove",
196                                        ldb_dn_get_linearized(source_dn),
197                                        ldb_dn_get_linearized(target_dn),
198                                        ldb_errstring(ldb));
199                 talloc_free(tmp_ctx);
200                 return ret;
201         }
202         talloc_free(tmp_ctx);
203         return ret;
204 }
205
206 /*
207   add a backlink to the list of backlinks to add/delete in the prepare
208   commit
209  */
210 static int replmd_add_backlink(struct ldb_module *module, const struct dsdb_schema *schema,
211                                struct GUID *forward_guid, struct GUID *target_guid,
212                                bool active, const struct dsdb_attribute *schema_attr, bool immediate)
213 {
214         const struct dsdb_attribute *target_attr;
215         struct la_backlink *bl;
216         struct replmd_private *replmd_private =
217                 talloc_get_type_abort(ldb_module_get_private(module), struct replmd_private);
218
219         target_attr = dsdb_attribute_by_linkID(schema, schema_attr->linkID + 1);
220         if (!target_attr) {
221                 /*
222                  * windows 2003 has a broken schema where the
223                  * definition of msDS-IsDomainFor is missing (which is
224                  * supposed to be the backlink of the
225                  * msDS-HasDomainNCs attribute
226                  */
227                 return LDB_SUCCESS;
228         }
229
230         /* see if its already in the list */
231         for (bl=replmd_private->la_backlinks; bl; bl=bl->next) {
232                 if (GUID_equal(forward_guid, &bl->forward_guid) &&
233                     GUID_equal(target_guid, &bl->target_guid) &&
234                     (target_attr->lDAPDisplayName == bl->attr_name ||
235                      strcmp(target_attr->lDAPDisplayName, bl->attr_name) == 0)) {
236                         break;
237                 }
238         }
239
240         if (bl) {
241                 /* we found an existing one */
242                 if (bl->active == active) {
243                         return LDB_SUCCESS;
244                 }
245                 DLIST_REMOVE(replmd_private->la_backlinks, bl);
246                 talloc_free(bl);
247                 return LDB_SUCCESS;
248         }
249
250         if (replmd_private->bl_ctx == NULL) {
251                 replmd_private->bl_ctx = talloc_new(replmd_private);
252                 if (replmd_private->bl_ctx == NULL) {
253                         ldb_module_oom(module);
254                         return LDB_ERR_OPERATIONS_ERROR;
255                 }
256         }
257
258         /* its a new one */
259         bl = talloc(replmd_private->bl_ctx, struct la_backlink);
260         if (bl == NULL) {
261                 ldb_module_oom(module);
262                 return LDB_ERR_OPERATIONS_ERROR;
263         }
264
265         bl->attr_name = target_attr->lDAPDisplayName;
266         bl->forward_guid = *forward_guid;
267         bl->target_guid = *target_guid;
268         bl->active = active;
269
270         /* the caller may ask for this backlink to be processed
271            immediately */
272         if (immediate) {
273                 int ret = replmd_process_backlink(module, bl);
274                 talloc_free(bl);
275                 return ret;
276         }
277
278         DLIST_ADD(replmd_private->la_backlinks, bl);
279
280         return LDB_SUCCESS;
281 }
282
283
284 /*
285  * Callback for most write operations in this module:
286  * 
287  * notify the repl task that a object has changed. The notifies are
288  * gathered up in the replmd_private structure then written to the
289  * @REPLCHANGED object in each partition during the prepare_commit
290  */
291 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
292 {
293         int ret;
294         struct replmd_replicated_request *ac = 
295                 talloc_get_type_abort(req->context, struct replmd_replicated_request);
296         struct replmd_private *replmd_private = 
297                 talloc_get_type_abort(ldb_module_get_private(ac->module), struct replmd_private);
298         struct nc_entry *modified_partition;
299         struct ldb_control *partition_ctrl;
300         const struct dsdb_control_current_partition *partition;
301
302         struct ldb_control **controls;
303
304         partition_ctrl = ldb_reply_get_control(ares, DSDB_CONTROL_CURRENT_PARTITION_OID);
305
306         /* Remove the 'partition' control from what we pass up the chain */
307         controls = controls_except_specified(ares->controls, ares, partition_ctrl);
308
309         if (ares->error != LDB_SUCCESS) {
310                 return ldb_module_done(ac->req, controls,
311                                         ares->response, ares->error);
312         }
313
314         if (ares->type != LDB_REPLY_DONE) {
315                 ldb_set_errstring(ldb_module_get_ctx(ac->module), "Invalid reply type for notify\n!");
316                 return ldb_module_done(ac->req, NULL,
317                                        NULL, LDB_ERR_OPERATIONS_ERROR);
318         }
319
320         if (!partition_ctrl) {
321                 return ldb_module_done(ac->req, NULL,
322                                        NULL, LDB_ERR_OPERATIONS_ERROR);
323         }
324
325         partition = talloc_get_type_abort(partition_ctrl->data,
326                                     struct dsdb_control_current_partition);
327         
328         if (ac->seq_num > 0) {
329                 for (modified_partition = replmd_private->ncs; modified_partition; 
330                      modified_partition = modified_partition->next) {
331                         if (ldb_dn_compare(modified_partition->dn, partition->dn) == 0) {
332                                 break;
333                         }
334                 }
335                 
336                 if (modified_partition == NULL) {
337                         modified_partition = talloc_zero(replmd_private, struct nc_entry);
338                         if (!modified_partition) {
339                                 ldb_oom(ldb_module_get_ctx(ac->module));
340                                 return ldb_module_done(ac->req, NULL,
341                                                        NULL, LDB_ERR_OPERATIONS_ERROR);
342                         }
343                         modified_partition->dn = ldb_dn_copy(modified_partition, partition->dn);
344                         if (!modified_partition->dn) {
345                                 ldb_oom(ldb_module_get_ctx(ac->module));
346                                 return ldb_module_done(ac->req, NULL,
347                                                        NULL, LDB_ERR_OPERATIONS_ERROR);
348                         }
349                         DLIST_ADD(replmd_private->ncs, modified_partition);
350                 }
351
352                 if (ac->seq_num > modified_partition->mod_usn) {
353                         modified_partition->mod_usn = ac->seq_num;
354                 }
355         }
356
357         if (ac->apply_mode) {
358                 talloc_free(ares);
359                 ac->index_current++;
360                 
361                 ret = replmd_replicated_apply_next(ac);
362                 if (ret != LDB_SUCCESS) {
363                         return ldb_module_done(ac->req, NULL, NULL, ret);
364                 }
365                 return ret;
366         } else {
367                 /* free the partition control container here, for the
368                  * common path.  Other cases will have it cleaned up
369                  * eventually with the ares */
370                 talloc_free(partition_ctrl);
371                 return ldb_module_done(ac->req, 
372                                        controls_except_specified(controls, ares, partition_ctrl),
373                                        ares->response, LDB_SUCCESS);
374         }
375 }
376
377
378 /*
379  * update a @REPLCHANGED record in each partition if there have been
380  * any writes of replicated data in the partition
381  */
382 static int replmd_notify_store(struct ldb_module *module)
383 {
384         struct replmd_private *replmd_private = 
385                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
386         struct ldb_context *ldb = ldb_module_get_ctx(module);
387
388         while (replmd_private->ncs) {
389                 int ret;
390                 struct nc_entry *modified_partition = replmd_private->ncs;
391
392                 ret = dsdb_save_partition_usn(ldb, modified_partition->dn, modified_partition->mod_usn);
393                 if (ret != LDB_SUCCESS) {
394                         DEBUG(0,(__location__ ": Failed to save partition uSN for %s\n",
395                                  ldb_dn_get_linearized(modified_partition->dn)));
396                         return ret;
397                 }
398                 DLIST_REMOVE(replmd_private->ncs, modified_partition);
399                 talloc_free(modified_partition);
400         }
401
402         return LDB_SUCCESS;
403 }
404
405
406 /*
407   created a replmd_replicated_request context
408  */
409 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
410                                                          struct ldb_request *req)
411 {
412         struct ldb_context *ldb;
413         struct replmd_replicated_request *ac;
414
415         ldb = ldb_module_get_ctx(module);
416
417         ac = talloc_zero(req, struct replmd_replicated_request);
418         if (ac == NULL) {
419                 ldb_oom(ldb);
420                 return NULL;
421         }
422
423         ac->module = module;
424         ac->req = req;
425
426         ac->schema = dsdb_get_schema(ldb);
427         if (!ac->schema) {
428                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
429                               "replmd_modify: no dsdb_schema loaded");
430                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
431                 return NULL;
432         }
433
434         return ac;
435 }
436
437 /*
438   add a time element to a record
439 */
440 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
441 {
442         struct ldb_message_element *el;
443         char *s;
444
445         if (ldb_msg_find_element(msg, attr) != NULL) {
446                 return LDB_SUCCESS;
447         }
448
449         s = ldb_timestring(msg, t);
450         if (s == NULL) {
451                 return LDB_ERR_OPERATIONS_ERROR;
452         }
453
454         if (ldb_msg_add_string(msg, attr, s) != LDB_SUCCESS) {
455                 return LDB_ERR_OPERATIONS_ERROR;
456         }
457
458         el = ldb_msg_find_element(msg, attr);
459         /* always set as replace. This works because on add ops, the flag
460            is ignored */
461         el->flags = LDB_FLAG_MOD_REPLACE;
462
463         return LDB_SUCCESS;
464 }
465
466 /*
467   add a uint64_t element to a record
468 */
469 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
470 {
471         struct ldb_message_element *el;
472
473         if (ldb_msg_find_element(msg, attr) != NULL) {
474                 return LDB_SUCCESS;
475         }
476
477         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != LDB_SUCCESS) {
478                 return LDB_ERR_OPERATIONS_ERROR;
479         }
480
481         el = ldb_msg_find_element(msg, attr);
482         /* always set as replace. This works because on add ops, the flag
483            is ignored */
484         el->flags = LDB_FLAG_MOD_REPLACE;
485
486         return LDB_SUCCESS;
487 }
488
489 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
490                                                    const struct replPropertyMetaData1 *m2,
491                                                    const uint32_t *rdn_attid)
492 {
493         if (m1->attid == m2->attid) {
494                 return 0;
495         }
496
497         /*
498          * the rdn attribute should be at the end!
499          * so we need to return a value greater than zero
500          * which means m1 is greater than m2
501          */
502         if (m1->attid == *rdn_attid) {
503                 return 1;
504         }
505
506         /*
507          * the rdn attribute should be at the end!
508          * so we need to return a value less than zero
509          * which means m2 is greater than m1
510          */
511         if (m2->attid == *rdn_attid) {
512                 return -1;
513         }
514
515         return m1->attid > m2->attid ? 1 : -1;
516 }
517
518 static int replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
519                                                 const struct dsdb_schema *schema,
520                                                 struct ldb_dn *dn)
521 {
522         const char *rdn_name;
523         const struct dsdb_attribute *rdn_sa;
524
525         rdn_name = ldb_dn_get_rdn_name(dn);
526         if (!rdn_name) {
527                 DEBUG(0,(__location__ ": No rDN for %s?\n", ldb_dn_get_linearized(dn)));
528                 return LDB_ERR_OPERATIONS_ERROR;
529         }
530
531         rdn_sa = dsdb_attribute_by_lDAPDisplayName(schema, rdn_name);
532         if (rdn_sa == NULL) {
533                 DEBUG(0,(__location__ ": No sa found for rDN %s for %s\n", rdn_name, ldb_dn_get_linearized(dn)));
534                 return LDB_ERR_OPERATIONS_ERROR;                
535         }
536
537         DEBUG(6,("Sorting rpmd with attid exception %u rDN=%s DN=%s\n", 
538                  rdn_sa->attributeID_id, rdn_name, ldb_dn_get_linearized(dn)));
539
540         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
541                   discard_const_p(void, &rdn_sa->attributeID_id), 
542                   (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
543
544         return LDB_SUCCESS;
545 }
546
547 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
548                                                  const struct ldb_message_element *e2,
549                                                  const struct dsdb_schema *schema)
550 {
551         const struct dsdb_attribute *a1;
552         const struct dsdb_attribute *a2;
553
554         /* 
555          * TODO: make this faster by caching the dsdb_attribute pointer
556          *       on the ldb_messag_element
557          */
558
559         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
560         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
561
562         /*
563          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
564          *       in the schema
565          */
566         if (!a1 || !a2) {
567                 return strcasecmp(e1->name, e2->name);
568         }
569         if (a1->attributeID_id == a2->attributeID_id) {
570                 return 0;
571         }
572         return a1->attributeID_id > a2->attributeID_id ? 1 : -1;
573 }
574
575 static void replmd_ldb_message_sort(struct ldb_message *msg,
576                                     const struct dsdb_schema *schema)
577 {
578         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
579                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
580 }
581
582 static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
583                                const struct GUID *invocation_id, uint64_t seq_num, time_t t);
584
585 /*
586   fix up linked attributes in replmd_add.
587   This involves setting up the right meta-data in extended DN
588   components, and creating backlinks to the object
589  */
590 static int replmd_add_fix_la(struct ldb_module *module, struct ldb_message_element *el,
591                              uint64_t seq_num, const struct GUID *invocationId, time_t t,
592                              struct GUID *guid, const struct dsdb_attribute *sa)
593 {
594         int i;
595         TALLOC_CTX *tmp_ctx = talloc_new(el->values);
596         struct ldb_context *ldb = ldb_module_get_ctx(module);
597         struct dsdb_schema *schema = dsdb_get_schema(ldb);
598
599         for (i=0; i<el->num_values; i++) {
600                 struct ldb_val *v = &el->values[i];
601                 struct dsdb_dn *dsdb_dn = dsdb_dn_parse(tmp_ctx, ldb, v, sa->syntax->ldap_oid);
602                 struct GUID target_guid;
603                 NTSTATUS status;
604                 int ret;
605
606                 ret = replmd_build_la_val(el->values, v, dsdb_dn, invocationId, seq_num, t);
607                 if (ret != LDB_SUCCESS) {
608                         talloc_free(tmp_ctx);
609                         return ret;
610                 }
611
612                 /* note that the DN already has the extended
613                    components from the extended_dn_store module */
614                 status = dsdb_get_extended_dn_guid(dsdb_dn->dn, &target_guid);
615                 if (!NT_STATUS_IS_OK(status)) {
616                         talloc_free(tmp_ctx);
617                         return LDB_ERR_OPERATIONS_ERROR;
618                 }
619
620                 ret = replmd_add_backlink(module, schema, guid, &target_guid, true, sa, false);
621                 if (ret != LDB_SUCCESS) {
622                         talloc_free(tmp_ctx);
623                         return ret;
624                 }
625         }
626
627         talloc_free(tmp_ctx);
628         return LDB_SUCCESS;
629 }
630
631
632 /*
633   intercept add requests
634  */
635 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
636 {
637         struct ldb_context *ldb;
638         struct ldb_control *control;
639         struct replmd_replicated_request *ac;
640         enum ndr_err_code ndr_err;
641         struct ldb_request *down_req;
642         struct ldb_message *msg;
643         const DATA_BLOB *guid_blob;
644         struct GUID guid;
645         struct replPropertyMetaDataBlob nmd;
646         struct ldb_val nmd_value;
647         const struct GUID *our_invocation_id;
648         time_t t = time(NULL);
649         NTTIME now;
650         char *time_str;
651         int ret;
652         uint32_t i, ni=0;
653         bool allow_add_guid = false;
654         bool remove_current_guid = false;
655
656         /* check if there's a show relax control (used by provision to say 'I know what I'm doing') */
657         control = ldb_request_get_control(req, LDB_CONTROL_RELAX_OID);
658         if (control) {
659                 allow_add_guid = 1;
660         }
661
662         /* do not manipulate our control entries */
663         if (ldb_dn_is_special(req->op.add.message->dn)) {
664                 return ldb_next_request(module, req);
665         }
666
667         ldb = ldb_module_get_ctx(module);
668
669         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
670
671         ac = replmd_ctx_init(module, req);
672         if (!ac) {
673                 return LDB_ERR_OPERATIONS_ERROR;
674         }
675
676         guid_blob = ldb_msg_find_ldb_val(req->op.add.message, "objectGUID");
677         if ( guid_blob != NULL ) {
678                 if( !allow_add_guid ) {
679                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
680                               "replmd_add: it's not allowed to add an object with objectGUID\n");
681                         talloc_free(ac);
682                         return LDB_ERR_UNWILLING_TO_PERFORM;
683                 } else {
684                         NTSTATUS status = GUID_from_data_blob(guid_blob,&guid);
685                         if ( !NT_STATUS_IS_OK(status)) {
686                                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
687                                       "replmd_add: Unable to parse as a GUID the attribute objectGUID\n");
688                                 talloc_free(ac);
689                                 return LDB_ERR_UNWILLING_TO_PERFORM;
690                         }
691                         /* we remove this attribute as it can be a string and will not be treated 
692                         correctly and then we will readd it latter on in the good format*/
693                         remove_current_guid = true;
694                 }
695         } else {
696                 /* a new GUID */
697                 guid = GUID_random();
698         }
699
700         /* Get a sequence number from the backend */
701         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ac->seq_num);
702         if (ret != LDB_SUCCESS) {
703                 talloc_free(ac);
704                 return ret;
705         }
706
707         /* get our invocationId */
708         our_invocation_id = samdb_ntds_invocation_id(ldb);
709         if (!our_invocation_id) {
710                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
711                               "replmd_add: unable to find invocationId\n");
712                 talloc_free(ac);
713                 return LDB_ERR_OPERATIONS_ERROR;
714         }
715
716         /* we have to copy the message as the caller might have it as a const */
717         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
718         if (msg == NULL) {
719                 ldb_oom(ldb);
720                 talloc_free(ac);
721                 return LDB_ERR_OPERATIONS_ERROR;
722         }
723
724         /* generated times */
725         unix_to_nt_time(&now, t);
726         time_str = ldb_timestring(msg, t);
727         if (!time_str) {
728                 ldb_oom(ldb);
729                 talloc_free(ac);
730                 return LDB_ERR_OPERATIONS_ERROR;
731         }
732         if (remove_current_guid) {
733                 ldb_msg_remove_attr(msg,"objectGUID");
734         }
735
736         /* 
737          * remove autogenerated attributes
738          */
739         ldb_msg_remove_attr(msg, "whenCreated");
740         ldb_msg_remove_attr(msg, "whenChanged");
741         ldb_msg_remove_attr(msg, "uSNCreated");
742         ldb_msg_remove_attr(msg, "uSNChanged");
743         ldb_msg_remove_attr(msg, "replPropertyMetaData");
744
745         /*
746          * readd replicated attributes
747          */
748         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
749         if (ret != LDB_SUCCESS) {
750                 ldb_oom(ldb);
751                 talloc_free(ac);
752                 return ret;
753         }
754
755         /* build the replication meta_data */
756         ZERO_STRUCT(nmd);
757         nmd.version             = 1;
758         nmd.ctr.ctr1.count      = msg->num_elements;
759         nmd.ctr.ctr1.array      = talloc_array(msg,
760                                                struct replPropertyMetaData1,
761                                                nmd.ctr.ctr1.count);
762         if (!nmd.ctr.ctr1.array) {
763                 ldb_oom(ldb);
764                 talloc_free(ac);
765                 return LDB_ERR_OPERATIONS_ERROR;
766         }
767
768         for (i=0; i < msg->num_elements; i++) {
769                 struct ldb_message_element *e = &msg->elements[i];
770                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
771                 const struct dsdb_attribute *sa;
772
773                 if (e->name[0] == '@') continue;
774
775                 sa = dsdb_attribute_by_lDAPDisplayName(ac->schema, e->name);
776                 if (!sa) {
777                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
778                                       "replmd_add: attribute '%s' not defined in schema\n",
779                                       e->name);
780                         talloc_free(ac);
781                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
782                 }
783
784                 if ((sa->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (sa->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
785                         /* if the attribute is not replicated (0x00000001)
786                          * or constructed (0x00000004) it has no metadata
787                          */
788                         continue;
789                 }
790
791 #if W2K3_LINKED_ATTRIBUTES
792                 if (sa->linkID != 0 && dsdb_functional_level(ldb) > DS_DOMAIN_FUNCTION_2000) {
793                         ret = replmd_add_fix_la(module, e, ac->seq_num, our_invocation_id, t, &guid, sa);
794                         if (ret != LDB_SUCCESS) {
795                                 talloc_free(ac);
796                                 return ret;
797                         }
798                         /* linked attributes are not stored in
799                            replPropertyMetaData in FL above w2k */
800                         continue;
801                 }
802 #endif
803
804                 m->attid                        = sa->attributeID_id;
805                 m->version                      = 1;
806                 m->originating_change_time      = now;
807                 m->originating_invocation_id    = *our_invocation_id;
808                 m->originating_usn              = ac->seq_num;
809                 m->local_usn                    = ac->seq_num;
810                 ni++;
811         }
812
813         /* fix meta data count */
814         nmd.ctr.ctr1.count = ni;
815
816         /*
817          * sort meta data array, and move the rdn attribute entry to the end
818          */
819         ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, ac->schema, msg->dn);
820         if (ret != LDB_SUCCESS) {
821                 talloc_free(ac);
822                 return ret;
823         }
824
825         /* generated NDR encoded values */
826         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
827                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
828                                        &nmd,
829                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
830         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
831                 ldb_oom(ldb);
832                 talloc_free(ac);
833                 return LDB_ERR_OPERATIONS_ERROR;
834         }
835
836         /*
837          * add the autogenerated values
838          */
839         ret = dsdb_msg_add_guid(msg, &guid, "objectGUID");
840         if (ret != LDB_SUCCESS) {
841                 ldb_oom(ldb);
842                 talloc_free(ac);
843                 return ret;
844         }
845         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
846         if (ret != LDB_SUCCESS) {
847                 ldb_oom(ldb);
848                 talloc_free(ac);
849                 return ret;
850         }
851         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", ac->seq_num);
852         if (ret != LDB_SUCCESS) {
853                 ldb_oom(ldb);
854                 talloc_free(ac);
855                 return ret;
856         }
857         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ac->seq_num);
858         if (ret != LDB_SUCCESS) {
859                 ldb_oom(ldb);
860                 talloc_free(ac);
861                 return ret;
862         }
863         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
864         if (ret != LDB_SUCCESS) {
865                 ldb_oom(ldb);
866                 talloc_free(ac);
867                 return ret;
868         }
869
870         /*
871          * sort the attributes by attid before storing the object
872          */
873         replmd_ldb_message_sort(msg, ac->schema);
874
875         ret = ldb_build_add_req(&down_req, ldb, ac,
876                                 msg,
877                                 req->controls,
878                                 ac, replmd_op_callback,
879                                 req);
880         if (ret != LDB_SUCCESS) {
881                 talloc_free(ac);
882                 return ret;
883         }
884
885         /* mark the control done */
886         if (control) {
887                 control->critical = 0;
888         }
889
890         /* go on with the call chain */
891         return ldb_next_request(module, down_req);
892 }
893
894
895 /*
896  * update the replPropertyMetaData for one element
897  */
898 static int replmd_update_rpmd_element(struct ldb_context *ldb, 
899                                       struct ldb_message *msg,
900                                       struct ldb_message_element *el,
901                                       struct replPropertyMetaDataBlob *omd,
902                                       const struct dsdb_schema *schema,
903                                       uint64_t *seq_num,
904                                       const struct GUID *our_invocation_id,
905                                       NTTIME now)
906 {
907         int i;
908         const struct dsdb_attribute *a;
909         struct replPropertyMetaData1 *md1;
910
911         a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
912         if (a == NULL) {
913                 DEBUG(0,(__location__ ": Unable to find attribute %s in schema\n",
914                          el->name));
915                 return LDB_ERR_OPERATIONS_ERROR;
916         }
917
918         if ((a->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (a->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
919                 return LDB_SUCCESS;
920         }
921
922         for (i=0; i<omd->ctr.ctr1.count; i++) {
923                 if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) break;
924         }
925
926 #if W2K3_LINKED_ATTRIBUTES
927         if (a->linkID != 0 && dsdb_functional_level(ldb) > DS_DOMAIN_FUNCTION_2000) {
928                 /* linked attributes are not stored in
929                    replPropertyMetaData in FL above w2k, but we do
930                    raise the seqnum for the object  */
931                 if (*seq_num == 0 &&
932                     ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num) != LDB_SUCCESS) {
933                         return LDB_ERR_OPERATIONS_ERROR;
934                 }
935                 return LDB_SUCCESS;
936         }
937 #endif
938
939         if (i == omd->ctr.ctr1.count) {
940                 /* we need to add a new one */
941                 omd->ctr.ctr1.array = talloc_realloc(msg, omd->ctr.ctr1.array, 
942                                                      struct replPropertyMetaData1, omd->ctr.ctr1.count+1);
943                 if (omd->ctr.ctr1.array == NULL) {
944                         ldb_oom(ldb);
945                         return LDB_ERR_OPERATIONS_ERROR;
946                 }
947                 omd->ctr.ctr1.count++;
948                 ZERO_STRUCT(omd->ctr.ctr1.array[i]);
949         }
950
951         /* Get a new sequence number from the backend. We only do this
952          * if we have a change that requires a new
953          * replPropertyMetaData element 
954          */
955         if (*seq_num == 0) {
956                 int ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num);
957                 if (ret != LDB_SUCCESS) {
958                         return LDB_ERR_OPERATIONS_ERROR;
959                 }
960         }
961
962         md1 = &omd->ctr.ctr1.array[i];
963         md1->version++;
964         md1->attid                     = a->attributeID_id;
965         md1->originating_change_time   = now;
966         md1->originating_invocation_id = *our_invocation_id;
967         md1->originating_usn           = *seq_num;
968         md1->local_usn                 = *seq_num;
969         
970         return LDB_SUCCESS;
971 }
972
973 /*
974  * update the replPropertyMetaData object each time we modify an
975  * object. This is needed for DRS replication, as the merge on the
976  * client is based on this object 
977  */
978 static int replmd_update_rpmd(struct ldb_module *module, 
979                               const struct dsdb_schema *schema, 
980                               struct ldb_message *msg, uint64_t *seq_num,
981                               time_t t)
982 {
983         const struct ldb_val *omd_value;
984         enum ndr_err_code ndr_err;
985         struct replPropertyMetaDataBlob omd;
986         int i;
987         NTTIME now;
988         const struct GUID *our_invocation_id;
989         int ret;
990         const char *attrs[] = { "replPropertyMetaData" , NULL };
991         struct ldb_result *res;
992         struct ldb_context *ldb;
993
994         ldb = ldb_module_get_ctx(module);
995
996         our_invocation_id = samdb_ntds_invocation_id(ldb);
997         if (!our_invocation_id) {
998                 /* this happens during an initial vampire while
999                    updating the schema */
1000                 DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
1001                 return LDB_SUCCESS;
1002         }
1003
1004         unix_to_nt_time(&now, t);
1005
1006         /* search for the existing replPropertyMetaDataBlob */
1007         ret = dsdb_search_dn_with_deleted(ldb, msg, &res, msg->dn, attrs);
1008         if (ret != LDB_SUCCESS || res->count != 1) {
1009                 DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
1010                          ldb_dn_get_linearized(msg->dn)));
1011                 return LDB_ERR_OPERATIONS_ERROR;
1012         }
1013                 
1014
1015         omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
1016         if (!omd_value) {
1017                 DEBUG(0,(__location__ ": Object %s does not have a replPropertyMetaData attribute\n",
1018                          ldb_dn_get_linearized(msg->dn)));
1019                 return LDB_ERR_OPERATIONS_ERROR;
1020         }
1021
1022         ndr_err = ndr_pull_struct_blob(omd_value, msg,
1023                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
1024                                        (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
1025         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1026                 DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
1027                          ldb_dn_get_linearized(msg->dn)));
1028                 return LDB_ERR_OPERATIONS_ERROR;
1029         }
1030
1031         if (omd.version != 1) {
1032                 DEBUG(0,(__location__ ": bad version %u in replPropertyMetaData for %s\n",
1033                          omd.version, ldb_dn_get_linearized(msg->dn)));
1034                 return LDB_ERR_OPERATIONS_ERROR;
1035         }
1036
1037         for (i=0; i<msg->num_elements; i++) {
1038                 ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], &omd, schema, seq_num, 
1039                                                  our_invocation_id, now);
1040                 if (ret != LDB_SUCCESS) {
1041                         return ret;
1042                 }
1043         }
1044
1045         /*
1046          * replmd_update_rpmd_element has done an update if the
1047          * seq_num is set
1048          */
1049         if (*seq_num != 0) {
1050                 struct ldb_val *md_value;
1051                 struct ldb_message_element *el;
1052
1053                 md_value = talloc(msg, struct ldb_val);
1054                 if (md_value == NULL) {
1055                         ldb_oom(ldb);
1056                         return LDB_ERR_OPERATIONS_ERROR;
1057                 }
1058
1059                 ret = replmd_replPropertyMetaDataCtr1_sort(&omd.ctr.ctr1, schema, msg->dn);
1060                 if (ret != LDB_SUCCESS) {
1061                         return ret;
1062                 }
1063
1064                 ndr_err = ndr_push_struct_blob(md_value, msg, 
1065                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1066                                                &omd,
1067                                                (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1068                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1069                         DEBUG(0,(__location__ ": Failed to marshall replPropertyMetaData for %s\n",
1070                                  ldb_dn_get_linearized(msg->dn)));
1071                         return LDB_ERR_OPERATIONS_ERROR;
1072                 }
1073
1074                 ret = ldb_msg_add_empty(msg, "replPropertyMetaData", LDB_FLAG_MOD_REPLACE, &el);
1075                 if (ret != LDB_SUCCESS) {
1076                         DEBUG(0,(__location__ ": Failed to add updated replPropertyMetaData %s\n",
1077                                  ldb_dn_get_linearized(msg->dn)));
1078                         return ret;
1079                 }
1080
1081                 el->num_values = 1;
1082                 el->values = md_value;
1083         }
1084
1085         return LDB_SUCCESS;     
1086 }
1087
1088
1089 struct parsed_dn {
1090         struct dsdb_dn *dsdb_dn;
1091         struct GUID *guid;
1092         struct ldb_val *v;
1093 };
1094
1095 static int parsed_dn_compare(struct parsed_dn *pdn1, struct parsed_dn *pdn2)
1096 {
1097         return GUID_compare(pdn1->guid, pdn2->guid);
1098 }
1099
1100 static struct parsed_dn *parsed_dn_find(struct parsed_dn *pdn, int count, struct GUID *guid)
1101 {
1102         struct parsed_dn *ret;
1103         BINARY_ARRAY_SEARCH(pdn, count, guid, guid, GUID_compare, ret);
1104         return ret;
1105 }
1106
1107 /*
1108   get a series of message element values as an array of DNs and GUIDs
1109   the result is sorted by GUID
1110  */
1111 static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
1112                           struct ldb_message_element *el, struct parsed_dn **pdn,
1113                           const char *ldap_oid)
1114 {
1115         int i;
1116         struct ldb_context *ldb = ldb_module_get_ctx(module);
1117
1118         if (el == NULL) {
1119                 *pdn = NULL;
1120                 return LDB_SUCCESS;
1121         }
1122
1123         (*pdn) = talloc_array(mem_ctx, struct parsed_dn, el->num_values);
1124         if (!*pdn) {
1125                 ldb_module_oom(module);
1126                 return LDB_ERR_OPERATIONS_ERROR;
1127         }
1128
1129         for (i=0; i<el->num_values; i++) {
1130                 struct ldb_val *v = &el->values[i];
1131                 NTSTATUS status;
1132                 struct ldb_dn *dn;
1133                 struct parsed_dn *p;
1134
1135                 p = &(*pdn)[i];
1136
1137                 p->dsdb_dn = dsdb_dn_parse(*pdn, ldb, v, ldap_oid);
1138                 if (p->dsdb_dn == NULL) {
1139                         return LDB_ERR_INVALID_DN_SYNTAX;
1140                 }
1141
1142                 dn = p->dsdb_dn->dn;
1143
1144                 p->guid = talloc(*pdn, struct GUID);
1145                 if (p->guid == NULL) {
1146                         ldb_module_oom(module);
1147                         return LDB_ERR_OPERATIONS_ERROR;
1148                 }
1149
1150                 status = dsdb_get_extended_dn_guid(dn, p->guid);
1151                 if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
1152                         /* we got a DN without a GUID - go find the GUID */
1153                         int ret = dsdb_find_guid_by_dn(ldb, dn, p->guid);
1154                         if (ret != LDB_SUCCESS) {
1155                                 ldb_asprintf_errstring(ldb, "Unable to find GUID for DN %s\n",
1156                                                        ldb_dn_get_linearized(dn));
1157                                 return ret;
1158                         }
1159                 } else if (!NT_STATUS_IS_OK(status)) {
1160                         return LDB_ERR_OPERATIONS_ERROR;
1161                 }
1162
1163                 /* keep a pointer to the original ldb_val */
1164                 p->v = v;
1165         }
1166
1167         qsort(*pdn, el->num_values, sizeof((*pdn)[0]), (comparison_fn_t)parsed_dn_compare);
1168
1169         return LDB_SUCCESS;
1170 }
1171
1172 /*
1173   build a new extended DN, including all meta data fields
1174
1175   DELETED             = 1 or missing
1176   RMD_ADDTIME         = originating_add_time
1177   RMD_INVOCID         = originating_invocation_id
1178   RMD_CHANGETIME      = originating_change_time
1179   RMD_ORIGINATING_USN = originating_usn
1180   RMD_LOCAL_USN       = local_usn
1181   RMD_VERSION         = version
1182  */
1183 static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
1184                                const struct GUID *invocation_id, uint64_t seq_num, time_t t)
1185 {
1186         struct ldb_dn *dn = dsdb_dn->dn;
1187         NTTIME now;
1188         const char *tstring, *usn_string;
1189         struct ldb_val tval;
1190         struct ldb_val iid;
1191         struct ldb_val usnv;
1192         struct ldb_val vers;
1193         NTSTATUS status;
1194         int ret;
1195         const char *dnstring;
1196
1197         unix_to_nt_time(&now, t);
1198         tstring = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)now);
1199         if (!tstring) {
1200                 return LDB_ERR_OPERATIONS_ERROR;
1201         }
1202         tval = data_blob_string_const(tstring);
1203
1204         usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)seq_num);
1205         if (!usn_string) {
1206                 return LDB_ERR_OPERATIONS_ERROR;
1207         }
1208         usnv = data_blob_string_const(usn_string);
1209
1210         vers = data_blob_string_const("0");
1211
1212         status = GUID_to_ndr_blob(invocation_id, dn, &iid);
1213         if (!NT_STATUS_IS_OK(status)) {
1214                 return LDB_ERR_OPERATIONS_ERROR;
1215         }
1216
1217         ret = ldb_dn_set_extended_component(dn, "DELETED", NULL);
1218         if (ret != LDB_SUCCESS) return ret;
1219         ret = ldb_dn_set_extended_component(dn, "RMD_ADDTIME", &tval);
1220         if (ret != LDB_SUCCESS) return ret;
1221         ret = ldb_dn_set_extended_component(dn, "RMD_INVOCID", &iid);
1222         if (ret != LDB_SUCCESS) return ret;
1223         ret = ldb_dn_set_extended_component(dn, "RMD_CHANGETIME", &tval);
1224         if (ret != LDB_SUCCESS) return ret;
1225         ret = ldb_dn_set_extended_component(dn, "RMD_LOCAL_USN", &usnv);
1226         if (ret != LDB_SUCCESS) return ret;
1227         ret = ldb_dn_set_extended_component(dn, "RMD_ORIGINATING_USN", &usnv);
1228         if (ret != LDB_SUCCESS) return ret;
1229         ret = ldb_dn_set_extended_component(dn, "RMD_VERSION", &vers);
1230         if (ret != LDB_SUCCESS) return ret;
1231
1232         dnstring = dsdb_dn_get_extended_linearized(mem_ctx, dsdb_dn, 1);
1233         if (dnstring == NULL) {
1234                 return LDB_ERR_OPERATIONS_ERROR;
1235         }
1236         *v = data_blob_string_const(dnstring);
1237
1238         return LDB_SUCCESS;
1239 }
1240
1241
1242 /*
1243   update an extended DN, including all meta data fields
1244
1245   see replmd_build_la_val for value names
1246  */
1247 static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
1248                                 struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
1249                                 uint64_t seq_num, time_t t, bool deleted)
1250 {
1251         struct ldb_dn *dn = dsdb_dn->dn;
1252         NTTIME now;
1253         const char *tstring, *usn_string;
1254         struct ldb_val tval;
1255         struct ldb_val iid;
1256         struct ldb_val usnv;
1257         struct ldb_val vers;
1258         const struct ldb_val *old_addtime, *old_version;
1259         NTSTATUS status;
1260         int ret;
1261         const char *dnstring;
1262
1263         unix_to_nt_time(&now, t);
1264         tstring = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)now);
1265         if (!tstring) {
1266                 return LDB_ERR_OPERATIONS_ERROR;
1267         }
1268         tval = data_blob_string_const(tstring);
1269
1270         usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)seq_num);
1271         if (!usn_string) {
1272                 return LDB_ERR_OPERATIONS_ERROR;
1273         }
1274         usnv = data_blob_string_const(usn_string);
1275
1276         status = GUID_to_ndr_blob(invocation_id, dn, &iid);
1277         if (!NT_STATUS_IS_OK(status)) {
1278                 return LDB_ERR_OPERATIONS_ERROR;
1279         }
1280
1281         if (deleted) {
1282                 struct ldb_val dv;
1283                 dv = data_blob_string_const("1");
1284                 ret = ldb_dn_set_extended_component(dn, "DELETED", &dv);
1285         } else {
1286                 ret = ldb_dn_set_extended_component(dn, "DELETED", NULL);
1287         }
1288         if (ret != LDB_SUCCESS) return ret;
1289
1290         /* get the ADDTIME from the original */
1291         old_addtime = ldb_dn_get_extended_component(old_dsdb_dn->dn, "RMD_ADDTIME");
1292         if (old_addtime == NULL) {
1293                 old_addtime = &tval;
1294         }
1295         if (dsdb_dn != old_dsdb_dn) {
1296                 ret = ldb_dn_set_extended_component(dn, "RMD_ADDTIME", old_addtime);
1297                 if (ret != LDB_SUCCESS) return ret;
1298         }
1299
1300         /* use our invocation id */
1301         ret = ldb_dn_set_extended_component(dn, "RMD_INVOCID", &iid);
1302         if (ret != LDB_SUCCESS) return ret;
1303
1304         /* changetime is the current time */
1305         ret = ldb_dn_set_extended_component(dn, "RMD_CHANGETIME", &tval);
1306         if (ret != LDB_SUCCESS) return ret;
1307
1308         /* update the USN */
1309         ret = ldb_dn_set_extended_component(dn, "RMD_ORIGINATING_USN", &usnv);
1310         if (ret != LDB_SUCCESS) return ret;
1311
1312         ret = ldb_dn_set_extended_component(dn, "RMD_LOCAL_USN", &usnv);
1313         if (ret != LDB_SUCCESS) return ret;
1314
1315         /* increase the version by 1 */
1316         old_version = ldb_dn_get_extended_component(old_dsdb_dn->dn, "RMD_VERSION");
1317         if (old_version == NULL) {
1318                 vers = data_blob_string_const("0");
1319         } else {
1320                 char *vstring;
1321                 vstring = talloc_strndup(dn, (const char *)old_version->data, old_version->length);
1322                 if (!vstring) {
1323                         return LDB_ERR_OPERATIONS_ERROR;
1324                 }
1325                 vstring = talloc_asprintf(dn, "%lu",
1326                                           (unsigned long)strtoul(vstring, NULL, 0)+1);
1327                 vers = data_blob_string_const(vstring);
1328         }
1329         ret = ldb_dn_set_extended_component(dn, "RMD_VERSION", &vers);
1330         if (ret != LDB_SUCCESS) return ret;
1331
1332         dnstring = dsdb_dn_get_extended_linearized(mem_ctx, dsdb_dn, 1);
1333         if (dnstring == NULL) {
1334                 return LDB_ERR_OPERATIONS_ERROR;
1335         }
1336         *v = data_blob_string_const(dnstring);
1337
1338         return LDB_SUCCESS;
1339 }
1340
1341 /*
1342   handle adding a linked attribute
1343  */
1344 static int replmd_modify_la_add(struct ldb_module *module,
1345                                 struct dsdb_schema *schema,
1346                                 struct ldb_message *msg,
1347                                 struct ldb_message_element *el,
1348                                 struct ldb_message_element *old_el,
1349                                 const struct dsdb_attribute *schema_attr,
1350                                 uint64_t seq_num,
1351                                 time_t t,
1352                                 struct GUID *msg_guid)
1353 {
1354         int i;
1355         struct parsed_dn *dns, *old_dns;
1356         TALLOC_CTX *tmp_ctx = talloc_new(msg);
1357         int ret;
1358         struct ldb_val *new_values = NULL;
1359         unsigned int num_new_values = 0;
1360         unsigned old_num_values = old_el?old_el->num_values:0;
1361         const struct GUID *invocation_id;
1362         struct ldb_context *ldb = ldb_module_get_ctx(module);
1363
1364         ret = get_parsed_dns(module, tmp_ctx, el, &dns, schema_attr->syntax->ldap_oid);
1365         if (ret != LDB_SUCCESS) {
1366                 talloc_free(tmp_ctx);
1367                 return ret;
1368         }
1369
1370         ret = get_parsed_dns(module, tmp_ctx, old_el, &old_dns, schema_attr->syntax->ldap_oid);
1371         if (ret != LDB_SUCCESS) {
1372                 talloc_free(tmp_ctx);
1373                 return ret;
1374         }
1375
1376         invocation_id = samdb_ntds_invocation_id(ldb);
1377         if (!invocation_id) {
1378                 return LDB_ERR_OPERATIONS_ERROR;
1379         }
1380
1381         /* for each new value, see if it exists already with the same GUID */
1382         for (i=0; i<el->num_values; i++) {
1383                 struct parsed_dn *p = parsed_dn_find(old_dns, old_num_values, dns[i].guid);
1384                 if (p == NULL) {
1385                         /* this is a new linked attribute value */
1386                         new_values = talloc_realloc(tmp_ctx, new_values, struct ldb_val, num_new_values+1);
1387                         if (new_values == NULL) {
1388                                 ldb_module_oom(module);
1389                                 talloc_free(tmp_ctx);
1390                                 return LDB_ERR_OPERATIONS_ERROR;
1391                         }
1392                         ret = replmd_build_la_val(new_values, &new_values[num_new_values], dns[i].dsdb_dn,
1393                                                   invocation_id, seq_num, t);
1394                         if (ret != LDB_SUCCESS) {
1395                                 talloc_free(tmp_ctx);
1396                                 return ret;
1397                         }
1398                         num_new_values++;
1399                 } else {
1400                         /* this is only allowed if the GUID was
1401                            previously deleted. */
1402                         const struct ldb_val *v;
1403                         v = ldb_dn_get_extended_component(p->dsdb_dn->dn, "DELETED");
1404                         if (v == NULL) {
1405                                 ldb_asprintf_errstring(ldb, "Attribute %s already exists for target GUID %s",
1406                                                        el->name, GUID_string(tmp_ctx, p->guid));
1407                                 talloc_free(tmp_ctx);
1408                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1409                         }
1410                         ret = replmd_update_la_val(old_el->values, p->v, dns[i].dsdb_dn, p->dsdb_dn,
1411                                                    invocation_id, seq_num, t, false);
1412                         if (ret != LDB_SUCCESS) {
1413                                 talloc_free(tmp_ctx);
1414                                 return ret;
1415                         }
1416                 }
1417
1418                 ret = replmd_add_backlink(module, schema, msg_guid, dns[i].guid, true, schema_attr, true);
1419                 if (ret != LDB_SUCCESS) {
1420                         talloc_free(tmp_ctx);
1421                         return ret;
1422                 }
1423         }
1424
1425         /* add the new ones on to the end of the old values, constructing a new el->values */
1426         el->values = talloc_realloc(msg->elements, old_el?old_el->values:NULL,
1427                                     struct ldb_val,
1428                                     old_num_values+num_new_values);
1429         if (el->values == NULL) {
1430                 ldb_module_oom(module);
1431                 return LDB_ERR_OPERATIONS_ERROR;
1432         }
1433
1434         memcpy(&el->values[old_num_values], new_values, num_new_values*sizeof(struct ldb_val));
1435         el->num_values = old_num_values + num_new_values;
1436
1437         talloc_steal(msg->elements, el->values);
1438         talloc_steal(el->values, new_values);
1439
1440         talloc_free(tmp_ctx);
1441
1442         /* we now tell the backend to replace all existing values
1443            with the one we have constructed */
1444         el->flags = LDB_FLAG_MOD_REPLACE;
1445
1446         return LDB_SUCCESS;
1447 }
1448
1449
1450 /*
1451   handle deleting all active linked attributes
1452  */
1453 static int replmd_modify_la_delete(struct ldb_module *module,
1454                                    struct dsdb_schema *schema,
1455                                    struct ldb_message *msg,
1456                                    struct ldb_message_element *el,
1457                                    struct ldb_message_element *old_el,
1458                                    const struct dsdb_attribute *schema_attr,
1459                                    uint64_t seq_num,
1460                                    time_t t,
1461                                    struct GUID *msg_guid)
1462 {
1463         int i;
1464         struct parsed_dn *dns, *old_dns;
1465         TALLOC_CTX *tmp_ctx = talloc_new(msg);
1466         int ret;
1467         const struct GUID *invocation_id;
1468         struct ldb_context *ldb = ldb_module_get_ctx(module);
1469
1470         /* check if there is nothing to delete */
1471         if ((!old_el || old_el->num_values == 0) &&
1472             el->num_values == 0) {
1473                 return LDB_SUCCESS;
1474         }
1475
1476         if (!old_el || old_el->num_values == 0) {
1477                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
1478         }
1479
1480         ret = get_parsed_dns(module, tmp_ctx, el, &dns, schema_attr->syntax->ldap_oid);
1481         if (ret != LDB_SUCCESS) {
1482                 talloc_free(tmp_ctx);
1483                 return ret;
1484         }
1485
1486         ret = get_parsed_dns(module, tmp_ctx, old_el, &old_dns, schema_attr->syntax->ldap_oid);
1487         if (ret != LDB_SUCCESS) {
1488                 talloc_free(tmp_ctx);
1489                 return ret;
1490         }
1491
1492         invocation_id = samdb_ntds_invocation_id(ldb);
1493         if (!invocation_id) {
1494                 return LDB_ERR_OPERATIONS_ERROR;
1495         }
1496
1497         el->values = NULL;
1498
1499         /* see if we are being asked to delete any links that
1500            don't exist or are already deleted */
1501         for (i=0; i<el->num_values; i++) {
1502                 struct parsed_dn *p = &dns[i];
1503                 struct parsed_dn *p2;
1504                 const struct ldb_val *v;
1505
1506                 p2 = parsed_dn_find(old_dns, old_el->num_values, p->guid);
1507                 if (!p2) {
1508                         ldb_asprintf_errstring(ldb, "Attribute %s doesn't exist for target GUID %s",
1509                                                el->name, GUID_string(tmp_ctx, p->guid));
1510                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
1511                 }
1512                 v = ldb_dn_get_extended_component(p2->dsdb_dn->dn, "DELETED");
1513                 if (v) {
1514                         ldb_asprintf_errstring(ldb, "Attribute %s already deleted for target GUID %s",
1515                                                el->name, GUID_string(tmp_ctx, p->guid));
1516                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
1517                 }
1518         }
1519
1520         /* for each new value, see if it exists already with the same GUID
1521            if it is not already deleted and matches the delete list then delete it
1522         */
1523         for (i=0; i<old_el->num_values; i++) {
1524                 struct parsed_dn *p = &old_dns[i];
1525                 const struct ldb_val *v;
1526
1527                 if (dns && parsed_dn_find(dns, el->num_values, p->guid) == NULL) {
1528                         continue;
1529                 }
1530
1531                 v = ldb_dn_get_extended_component(p->dsdb_dn->dn, "DELETED");
1532                 if (v != NULL) continue;
1533
1534                 ret = replmd_update_la_val(old_el->values, p->v, p->dsdb_dn, p->dsdb_dn,
1535                                            invocation_id, seq_num, t, true);
1536                 if (ret != LDB_SUCCESS) {
1537                         talloc_free(tmp_ctx);
1538                         return ret;
1539                 }
1540
1541                 ret = replmd_add_backlink(module, schema, msg_guid, old_dns[i].guid, false, schema_attr, true);
1542                 if (ret != LDB_SUCCESS) {
1543                         talloc_free(tmp_ctx);
1544                         return ret;
1545                 }
1546         }
1547
1548         el->values = talloc_steal(msg->elements, old_el->values);
1549         el->num_values = old_el->num_values;
1550
1551         talloc_free(tmp_ctx);
1552
1553         /* we now tell the backend to replace all existing values
1554            with the one we have constructed */
1555         el->flags = LDB_FLAG_MOD_REPLACE;
1556
1557         return LDB_SUCCESS;
1558 }
1559
1560 /*
1561   handle replacing a linked attribute
1562  */
1563 static int replmd_modify_la_replace(struct ldb_module *module,
1564                                     struct dsdb_schema *schema,
1565                                     struct ldb_message *msg,
1566                                     struct ldb_message_element *el,
1567                                     struct ldb_message_element *old_el,
1568                                     const struct dsdb_attribute *schema_attr,
1569                                     uint64_t seq_num,
1570                                     time_t t,
1571                                     struct GUID *msg_guid)
1572 {
1573         int i;
1574         struct parsed_dn *dns, *old_dns;
1575         TALLOC_CTX *tmp_ctx = talloc_new(msg);
1576         int ret;
1577         const struct GUID *invocation_id;
1578         struct ldb_context *ldb = ldb_module_get_ctx(module);
1579         struct ldb_val *new_values = NULL;
1580         uint32_t num_new_values = 0;
1581         unsigned old_num_values = old_el?old_el->num_values:0;
1582
1583         /* check if there is nothing to replace */
1584         if ((!old_el || old_el->num_values == 0) &&
1585             el->num_values == 0) {
1586                 return LDB_SUCCESS;
1587         }
1588
1589         ret = get_parsed_dns(module, tmp_ctx, el, &dns, schema_attr->syntax->ldap_oid);
1590         if (ret != LDB_SUCCESS) {
1591                 talloc_free(tmp_ctx);
1592                 return ret;
1593         }
1594
1595         ret = get_parsed_dns(module, tmp_ctx, old_el, &old_dns, schema_attr->syntax->ldap_oid);
1596         if (ret != LDB_SUCCESS) {
1597                 talloc_free(tmp_ctx);
1598                 return ret;
1599         }
1600
1601         invocation_id = samdb_ntds_invocation_id(ldb);
1602         if (!invocation_id) {
1603                 return LDB_ERR_OPERATIONS_ERROR;
1604         }
1605
1606         /* mark all the old ones as deleted */
1607         for (i=0; i<old_num_values; i++) {
1608                 struct parsed_dn *old_p = &old_dns[i];
1609                 struct parsed_dn *p;
1610                 const struct ldb_val *v;
1611
1612                 v = ldb_dn_get_extended_component(old_p->dsdb_dn->dn, "DELETED");
1613                 if (v) continue;
1614
1615                 ret = replmd_add_backlink(module, schema, msg_guid, old_dns[i].guid, false, schema_attr, false);
1616                 if (ret != LDB_SUCCESS) {
1617                         talloc_free(tmp_ctx);
1618                         return ret;
1619                 }
1620
1621                 p = parsed_dn_find(dns, el->num_values, old_p->guid);
1622                 if (p) {
1623                         /* we don't delete it if we are re-adding it */
1624                         continue;
1625                 }
1626
1627                 ret = replmd_update_la_val(old_el->values, old_p->v, old_p->dsdb_dn, old_p->dsdb_dn,
1628                                            invocation_id, seq_num, t, true);
1629                 if (ret != LDB_SUCCESS) {
1630                         talloc_free(tmp_ctx);
1631                         return ret;
1632                 }
1633         }
1634
1635         /* for each new value, either update its meta-data, or add it
1636          * to old_el
1637         */
1638         for (i=0; i<el->num_values; i++) {
1639                 struct parsed_dn *p = &dns[i], *old_p;
1640
1641                 if (old_dns &&
1642                     (old_p = parsed_dn_find(old_dns,
1643                                             old_num_values, p->guid)) != NULL) {
1644                         /* update in place */
1645                         ret = replmd_update_la_val(old_el->values, old_p->v, old_p->dsdb_dn,
1646                                                    old_p->dsdb_dn, invocation_id,
1647                                                    seq_num, t, false);
1648                         if (ret != LDB_SUCCESS) {
1649                                 talloc_free(tmp_ctx);
1650                                 return ret;
1651                         }
1652                 } else {
1653                         /* add a new one */
1654                         new_values = talloc_realloc(tmp_ctx, new_values, struct ldb_val,
1655                                                     num_new_values+1);
1656                         if (new_values == NULL) {
1657                                 ldb_module_oom(module);
1658                                 talloc_free(tmp_ctx);
1659                                 return LDB_ERR_OPERATIONS_ERROR;
1660                         }
1661                         ret = replmd_build_la_val(new_values, &new_values[num_new_values], dns[i].dsdb_dn,
1662                                                   invocation_id, seq_num, t);
1663                         if (ret != LDB_SUCCESS) {
1664                                 talloc_free(tmp_ctx);
1665                                 return ret;
1666                         }
1667                         num_new_values++;
1668                 }
1669
1670                 ret = replmd_add_backlink(module, schema, msg_guid, dns[i].guid, true, schema_attr, false);
1671                 if (ret != LDB_SUCCESS) {
1672                         talloc_free(tmp_ctx);
1673                         return ret;
1674                 }
1675         }
1676
1677         /* add the new values to the end of old_el */
1678         if (num_new_values != 0) {
1679                 el->values = talloc_realloc(msg->elements, old_el?old_el->values:NULL,
1680                                             struct ldb_val, old_num_values+num_new_values);
1681                 if (el->values == NULL) {
1682                         ldb_module_oom(module);
1683                         return LDB_ERR_OPERATIONS_ERROR;
1684                 }
1685                 memcpy(&el->values[old_num_values], &new_values[0],
1686                        sizeof(struct ldb_val)*num_new_values);
1687                 el->num_values = old_num_values + num_new_values;
1688                 talloc_steal(msg->elements, new_values);
1689         } else {
1690                 el->values = old_el->values;
1691                 el->num_values = old_el->num_values;
1692                 talloc_steal(msg->elements, el->values);
1693         }
1694
1695         talloc_free(tmp_ctx);
1696
1697         /* we now tell the backend to replace all existing values
1698            with the one we have constructed */
1699         el->flags = LDB_FLAG_MOD_REPLACE;
1700
1701         return LDB_SUCCESS;
1702 }
1703
1704
1705 /*
1706   handle linked attributes in modify requests
1707  */
1708 static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
1709                                                struct ldb_message *msg,
1710                                                uint64_t seq_num, time_t t)
1711 {
1712         struct ldb_result *res;
1713         int ret, i;
1714         struct ldb_context *ldb = ldb_module_get_ctx(module);
1715         struct ldb_message *old_msg;
1716         struct dsdb_schema *schema = dsdb_get_schema(ldb);
1717         struct GUID old_guid;
1718
1719         if (seq_num == 0) {
1720                 /* there the replmd_update_rpmd code has already
1721                  * checked and saw that there are no linked
1722                  * attributes */
1723                 return LDB_SUCCESS;
1724         }
1725
1726 #if !W2K3_LINKED_ATTRIBUTES
1727         return LDB_SUCCESS;
1728 #endif
1729
1730         if (dsdb_functional_level(ldb) == DS_DOMAIN_FUNCTION_2000) {
1731                 /* don't do anything special for linked attributes */
1732                 return LDB_SUCCESS;
1733         }
1734
1735         ret = dsdb_module_search_dn(module, msg, &res, msg->dn, NULL,
1736                                     DSDB_SEARCH_SHOW_DELETED |
1737                                     DSDB_SEARCH_REVEAL_INTERNALS |
1738                                     DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT);
1739         if (ret != LDB_SUCCESS) {
1740                 return ret;
1741         }
1742         old_msg = res->msgs[0];
1743
1744         old_guid = samdb_result_guid(old_msg, "objectGUID");
1745
1746         for (i=0; i<msg->num_elements; i++) {
1747                 struct ldb_message_element *el = &msg->elements[i];
1748                 struct ldb_message_element *old_el, *new_el;
1749                 const struct dsdb_attribute *schema_attr
1750                         = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
1751                 if (!schema_attr) {
1752                         ldb_asprintf_errstring(ldb,
1753                                                "attribute %s is not a valid attribute in schema", el->name);
1754                         return LDB_ERR_OBJECT_CLASS_VIOLATION;
1755                 }
1756                 if (schema_attr->linkID == 0) {
1757                         continue;
1758                 }
1759                 if ((schema_attr->linkID & 1) == 1) {
1760                         /* Odd is for the target.  Illegal to modify */
1761                         ldb_asprintf_errstring(ldb,
1762                                                "attribute %s must not be modified directly, it is a linked attribute", el->name);
1763                         return LDB_ERR_UNWILLING_TO_PERFORM;
1764                 }
1765                 old_el = ldb_msg_find_element(old_msg, el->name);
1766                 switch (el->flags & LDB_FLAG_MOD_MASK) {
1767                 case LDB_FLAG_MOD_REPLACE:
1768                         ret = replmd_modify_la_replace(module, schema, msg, el, old_el, schema_attr, seq_num, t, &old_guid);
1769                         break;
1770                 case LDB_FLAG_MOD_DELETE:
1771                         ret = replmd_modify_la_delete(module, schema, msg, el, old_el, schema_attr, seq_num, t, &old_guid);
1772                         break;
1773                 case LDB_FLAG_MOD_ADD:
1774                         ret = replmd_modify_la_add(module, schema, msg, el, old_el, schema_attr, seq_num, t, &old_guid);
1775                         break;
1776                 default:
1777                         ldb_asprintf_errstring(ldb,
1778                                                "invalid flags 0x%x for %s linked attribute",
1779                                                el->flags, el->name);
1780                         return LDB_ERR_UNWILLING_TO_PERFORM;
1781                 }
1782                 if (ret != LDB_SUCCESS) {
1783                         return ret;
1784                 }
1785                 if (old_el) {
1786                         ldb_msg_remove_attr(old_msg, el->name);
1787                 }
1788                 ldb_msg_add_empty(old_msg, el->name, 0, &new_el);
1789                 new_el->num_values = el->num_values;
1790                 new_el->values = el->values;
1791
1792                 /* TODO: this relises a bit too heavily on the exact
1793                    behaviour of ldb_msg_find_element and
1794                    ldb_msg_remove_element */
1795                 old_el = ldb_msg_find_element(msg, el->name);
1796                 if (old_el != el) {
1797                         ldb_msg_remove_element(msg, old_el);
1798                         i--;
1799                 }
1800         }
1801
1802         talloc_free(res);
1803         return ret;
1804 }
1805
1806
1807
1808 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
1809 {
1810         struct ldb_context *ldb;
1811         struct replmd_replicated_request *ac;
1812         struct ldb_request *down_req;
1813         struct ldb_message *msg;
1814         time_t t = time(NULL);
1815         int ret;
1816
1817         /* do not manipulate our control entries */
1818         if (ldb_dn_is_special(req->op.mod.message->dn)) {
1819                 return ldb_next_request(module, req);
1820         }
1821
1822         ldb = ldb_module_get_ctx(module);
1823
1824         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
1825
1826         ac = replmd_ctx_init(module, req);
1827         if (!ac) {
1828                 return LDB_ERR_OPERATIONS_ERROR;
1829         }
1830
1831         /* we have to copy the message as the caller might have it as a const */
1832         msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
1833         if (msg == NULL) {
1834                 ldb_oom(ldb);
1835                 talloc_free(ac);
1836                 return LDB_ERR_OPERATIONS_ERROR;
1837         }
1838
1839         ret = replmd_update_rpmd(module, ac->schema, msg, &ac->seq_num, t);
1840         if (ret != LDB_SUCCESS) {
1841                 talloc_free(ac);
1842                 return ret;
1843         }
1844
1845         ret = replmd_modify_handle_linked_attribs(module, msg, ac->seq_num, t);
1846         if (ret != LDB_SUCCESS) {
1847                 talloc_free(ac);
1848                 return ret;
1849         }
1850
1851         /* TODO:
1852          * - replace the old object with the newly constructed one
1853          */
1854
1855         ret = ldb_build_mod_req(&down_req, ldb, ac,
1856                                 msg,
1857                                 req->controls,
1858                                 ac, replmd_op_callback,
1859                                 req);
1860         if (ret != LDB_SUCCESS) {
1861                 talloc_free(ac);
1862                 return ret;
1863         }
1864         talloc_steal(down_req, msg);
1865
1866         /* we only change whenChanged and uSNChanged if the seq_num
1867            has changed */
1868         if (ac->seq_num != 0) {
1869                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
1870                         talloc_free(ac);
1871                         return ret;
1872                 }
1873
1874                 if (add_uint64_element(msg, "uSNChanged", ac->seq_num) != LDB_SUCCESS) {
1875                         talloc_free(ac);
1876                         return ret;
1877                 }
1878         }
1879
1880         /* go on with the call chain */
1881         return ldb_next_request(module, down_req);
1882 }
1883
1884 static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *ares);
1885
1886 /*
1887   handle a rename request
1888
1889   On a rename we need to do an extra ldb_modify which sets the
1890   whenChanged and uSNChanged attributes.  We do this in a callback after the success.
1891  */
1892 static int replmd_rename(struct ldb_module *module, struct ldb_request *req)
1893 {
1894         struct ldb_context *ldb;
1895         struct replmd_replicated_request *ac;
1896         int ret;
1897         struct ldb_request *down_req;
1898
1899         /* do not manipulate our control entries */
1900         if (ldb_dn_is_special(req->op.mod.message->dn)) {
1901                 return ldb_next_request(module, req);
1902         }
1903
1904         ldb = ldb_module_get_ctx(module);
1905
1906         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_rename\n");
1907
1908         ac = replmd_ctx_init(module, req);
1909         if (!ac) {
1910                 return LDB_ERR_OPERATIONS_ERROR;
1911         }
1912         ret = ldb_build_rename_req(&down_req, ldb, ac,
1913                                    ac->req->op.rename.olddn,
1914                                    ac->req->op.rename.newdn,
1915                                    ac->req->controls,
1916                                    ac, replmd_rename_callback,
1917                                    ac->req);
1918
1919         if (ret != LDB_SUCCESS) {
1920                 talloc_free(ac);
1921                 return ret;
1922         }
1923
1924         /* go on with the call chain */
1925         return ldb_next_request(module, down_req);
1926 }
1927
1928 /* After the rename is compleated, update the whenchanged etc */
1929 static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *ares)
1930 {
1931         struct ldb_context *ldb;
1932         struct replmd_replicated_request *ac;
1933         struct ldb_request *down_req;
1934         struct ldb_message *msg;
1935         time_t t = time(NULL);
1936         int ret;
1937
1938         ac = talloc_get_type(req->context, struct replmd_replicated_request);
1939         ldb = ldb_module_get_ctx(ac->module);
1940
1941         if (ares->error != LDB_SUCCESS) {
1942                 return ldb_module_done(ac->req, ares->controls,
1943                                         ares->response, ares->error);
1944         }
1945
1946         if (ares->type != LDB_REPLY_DONE) {
1947                 ldb_set_errstring(ldb,
1948                                   "invalid ldb_reply_type in callback");
1949                 talloc_free(ares);
1950                 return ldb_module_done(ac->req, NULL, NULL,
1951                                         LDB_ERR_OPERATIONS_ERROR);
1952         }
1953
1954         /* Get a sequence number from the backend */
1955         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ac->seq_num);
1956         if (ret != LDB_SUCCESS) {
1957                 return ret;
1958         }
1959
1960         /* TODO:
1961          * - replace the old object with the newly constructed one
1962          */
1963
1964         msg = ldb_msg_new(ac);
1965         if (msg == NULL) {
1966                 ldb_oom(ldb);
1967                 return LDB_ERR_OPERATIONS_ERROR;
1968         }
1969
1970         msg->dn = ac->req->op.rename.newdn;
1971
1972         ret = ldb_build_mod_req(&down_req, ldb, ac,
1973                                 msg,
1974                                 req->controls,
1975                                 ac, replmd_op_callback,
1976                                 req);
1977
1978         if (ret != LDB_SUCCESS) {
1979                 talloc_free(ac);
1980                 return ret;
1981         }
1982         talloc_steal(down_req, msg);
1983
1984         if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
1985                 talloc_free(ac);
1986                 return ret;
1987         }
1988         
1989         if (add_uint64_element(msg, "uSNChanged", ac->seq_num) != LDB_SUCCESS) {
1990                 talloc_free(ac);
1991                 return ret;
1992         }
1993
1994         /* go on with the call chain - do the modify after the rename */
1995         return ldb_next_request(ac->module, down_req);
1996 }
1997
1998 /* remove forwards and backlinks as needed when an object
1999    is deleted */
2000 static int replmd_delete_remove_link(struct ldb_module *module,
2001                                      struct dsdb_schema *schema,
2002                                      struct ldb_dn *dn,
2003                                      struct ldb_message_element *el,
2004                                      const struct dsdb_attribute *sa)
2005 {
2006         int i;
2007         TALLOC_CTX *tmp_ctx = talloc_new(module);
2008         struct ldb_context *ldb = ldb_module_get_ctx(module);
2009
2010         for (i=0; i<el->num_values; i++) {
2011                 struct dsdb_dn *dsdb_dn;
2012                 NTSTATUS status;
2013                 int ret;
2014                 struct GUID guid2;
2015                 struct ldb_message *msg;
2016                 const struct dsdb_attribute *target_attr;
2017                 struct ldb_message_element *el2;
2018                 struct ldb_val dn_val;
2019
2020                 if (dsdb_dn_is_deleted_val(&el->values[i])) {
2021                         continue;
2022                 }
2023
2024                 dsdb_dn = dsdb_dn_parse(tmp_ctx, ldb, &el->values[i], sa->syntax->ldap_oid);
2025                 if (!dsdb_dn) {
2026                         talloc_free(tmp_ctx);
2027                         return LDB_ERR_OPERATIONS_ERROR;
2028                 }
2029
2030                 status = dsdb_get_extended_dn_guid(dsdb_dn->dn, &guid2);
2031                 if (!NT_STATUS_IS_OK(status)) {
2032                         talloc_free(tmp_ctx);
2033                         return LDB_ERR_OPERATIONS_ERROR;
2034                 }
2035
2036                 /* remove the link */
2037                 msg = ldb_msg_new(tmp_ctx);
2038                 if (!msg) {
2039                         ldb_module_oom(module);
2040                         talloc_free(tmp_ctx);
2041                         return LDB_ERR_OPERATIONS_ERROR;
2042                 }
2043
2044
2045                 msg->dn = dsdb_dn->dn;
2046
2047                 target_attr = dsdb_attribute_by_linkID(schema, sa->linkID ^ 1);
2048                 if (target_attr == NULL) {
2049                         continue;
2050                 }
2051
2052                 ret = ldb_msg_add_empty(msg, target_attr->lDAPDisplayName, LDB_FLAG_MOD_DELETE, &el2);
2053                 if (ret != LDB_SUCCESS) {
2054                         ldb_module_oom(module);
2055                         talloc_free(tmp_ctx);
2056                         return LDB_ERR_OPERATIONS_ERROR;
2057                 }
2058                 dn_val = data_blob_string_const(ldb_dn_get_linearized(dn));
2059                 el2->values = &dn_val;
2060                 el2->num_values = 1;
2061
2062                 ret = dsdb_module_modify(module, msg, 0);
2063                 if (ret != LDB_SUCCESS) {
2064                         talloc_free(tmp_ctx);
2065                         return ret;
2066                 }
2067         }
2068         talloc_free(tmp_ctx);
2069         return LDB_SUCCESS;
2070 }
2071
2072
2073 /*
2074   handle update of replication meta data for deletion of objects
2075
2076   This also handles the mapping of delete to a rename operation
2077   to allow deletes to be replicated.
2078  */
2079 static int replmd_delete(struct ldb_module *module, struct ldb_request *req)
2080 {
2081         int ret = LDB_ERR_OTHER;
2082         bool retb;
2083         struct ldb_dn *old_dn, *new_dn;
2084         const char *rdn_name;
2085         const struct ldb_val *rdn_value, *new_rdn_value;
2086         struct GUID guid;
2087         struct ldb_context *ldb = ldb_module_get_ctx(module);
2088         struct dsdb_schema *schema = dsdb_get_schema(ldb);
2089         struct ldb_message *msg, *old_msg;
2090         struct ldb_message_element *el;
2091         TALLOC_CTX *tmp_ctx;
2092         struct ldb_result *res, *parent_res;
2093         const char *preserved_attrs[] = {
2094                 /* yes, this really is a hard coded list. See MS-ADTS
2095                    section 3.1.1.5.5.1.1 */
2096                 "nTSecurityDescriptor", "attributeID", "attributeSyntax", "dNReferenceUpdate", "dNSHostName",
2097                 "flatName", "governsID", "groupType", "instanceType", "lDAPDisplayName", "legacyExchangeDN",
2098                 "isDeleted", "isRecycled", "lastKnownParent", "msDS-LastKnownRDN", "mS-DS-CreatorSID",
2099                 "mSMQOwnerID", "nCName", "objectClass", "distinguishedName", "objectGUID", "objectSid",
2100                 "oMSyntax", "proxiedObjectName", "name", "replPropertyMetaData", "sAMAccountName",
2101                 "securityIdentifier", "sIDHistory", "subClassOf", "systemFlags", "trustPartner", "trustDirection",
2102                 "trustType", "trustAttributes", "userAccountControl", "uSNChanged", "uSNCreated", "whenCreate",
2103                 NULL};
2104         uint32_t el_count = 0;
2105         int i;
2106
2107         tmp_ctx = talloc_new(ldb);
2108
2109         old_dn = ldb_dn_copy(tmp_ctx, req->op.del.dn);
2110
2111         /* we need the complete msg off disk, so we can work out which
2112            attributes need to be removed */
2113         ret = dsdb_module_search_dn(module, tmp_ctx, &res, old_dn, NULL,
2114                                     DSDB_SEARCH_SHOW_DELETED |
2115                                     DSDB_SEARCH_REVEAL_INTERNALS |
2116                                     DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT);
2117         if (ret != LDB_SUCCESS) {
2118                 talloc_free(tmp_ctx);
2119                 return ret;
2120         }
2121         old_msg = res->msgs[0];
2122
2123         /* work out where we will be renaming this object to */
2124         ret = dsdb_get_deleted_objects_dn(ldb, tmp_ctx, old_dn, &new_dn);
2125         if (ret != LDB_SUCCESS) {
2126                 /* this is probably an attempted delete on a partition
2127                  * that doesn't allow delete operations, such as the
2128                  * schema partition */
2129                 ldb_asprintf_errstring(ldb, "No Deleted Objects container for DN %s",
2130                                        ldb_dn_get_linearized(old_dn));
2131                 talloc_free(tmp_ctx);
2132                 return LDB_ERR_UNWILLING_TO_PERFORM;
2133         }
2134
2135         rdn_name = ldb_dn_get_rdn_name(old_dn);
2136         rdn_value = ldb_dn_get_rdn_val(old_dn);
2137
2138         /* get the objects GUID from the search we just did */
2139         guid = samdb_result_guid(old_msg, "objectGUID");
2140
2141         /* Add a formatted child */
2142         retb = ldb_dn_add_child_fmt(new_dn, "%s=%s\\0ADEL:%s",
2143                                     rdn_name,
2144                                     rdn_value->data,
2145                                     GUID_string(tmp_ctx, &guid));
2146         if (!retb) {
2147                 DEBUG(0,(__location__ ": Unable to add a formatted child to dn: %s",
2148                                 ldb_dn_get_linearized(new_dn)));
2149                 talloc_free(tmp_ctx);
2150                 return LDB_ERR_OPERATIONS_ERROR;
2151         }
2152
2153         /*
2154           now we need to modify the object in the following ways:
2155
2156           - add isDeleted=TRUE
2157           - update rDN and name, with new rDN
2158           - remove linked attributes
2159           - remove objectCategory and sAMAccountType
2160           - remove attribs not on the preserved list
2161              - preserved if in above list, or is rDN
2162           - remove all linked attribs from this object
2163           - remove all links from other objects to this object
2164           - add lastKnownParent
2165           - update replPropertyMetaData?
2166
2167           see MS-ADTS "Tombstone Requirements" section 3.1.1.5.5.1.1
2168          */
2169
2170         msg = ldb_msg_new(tmp_ctx);
2171         if (msg == NULL) {
2172                 ldb_module_oom(module);
2173                 talloc_free(tmp_ctx);
2174                 return LDB_ERR_OPERATIONS_ERROR;
2175         }
2176
2177         msg->dn = old_dn;
2178
2179         ret = ldb_msg_add_string(msg, "isDeleted", "TRUE");
2180         if (ret != LDB_SUCCESS) {
2181                 DEBUG(0,(__location__ ": Failed to add isDeleted string to the msg\n"));
2182                 ldb_module_oom(module);
2183                 talloc_free(tmp_ctx);
2184                 return ret;
2185         }
2186         msg->elements[el_count++].flags = LDB_FLAG_MOD_ADD;
2187
2188         /* we need the storage form of the parent GUID */
2189         ret = dsdb_module_search_dn(module, tmp_ctx, &parent_res,
2190                                     ldb_dn_get_parent(tmp_ctx, old_dn), NULL,
2191                                     DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT);
2192         if (ret != LDB_SUCCESS) {
2193                 talloc_free(tmp_ctx);
2194                 return ret;
2195         }
2196
2197         ret = ldb_msg_add_steal_string(msg, "lastKnownParent",
2198                                        ldb_dn_get_extended_linearized(tmp_ctx, parent_res->msgs[0]->dn, 1));
2199         if (ret != LDB_SUCCESS) {
2200                 DEBUG(0,(__location__ ": Failed to add lastKnownParent string to the msg\n"));
2201                 ldb_module_oom(module);
2202                 talloc_free(tmp_ctx);
2203                 return ret;
2204         }
2205         msg->elements[el_count++].flags = LDB_FLAG_MOD_ADD;
2206
2207         /* work out which of the old attributes we will be removing */
2208         for (i=0; i<old_msg->num_elements; i++) {
2209                 const struct dsdb_attribute *sa;
2210                 el = &old_msg->elements[i];
2211                 sa = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
2212                 if (!sa) {
2213                         talloc_free(tmp_ctx);
2214                         return LDB_ERR_OPERATIONS_ERROR;
2215                 }
2216                 if (ldb_attr_cmp(el->name, rdn_name) == 0) {
2217                         /* don't remove the rDN */
2218                         continue;
2219                 }
2220
2221                 if (sa->linkID) {
2222                         ret = replmd_delete_remove_link(module, schema, old_dn, el, sa);
2223                         if (ret != LDB_SUCCESS) {
2224                                 talloc_free(tmp_ctx);
2225                                 return LDB_ERR_OPERATIONS_ERROR;
2226                         }
2227                 }
2228
2229                 if (!sa->linkID && ldb_attr_in_list(preserved_attrs, el->name)) {
2230                         continue;
2231                 }
2232
2233                 ret = ldb_msg_add_empty(msg, el->name, LDB_FLAG_MOD_DELETE, &el);
2234                 if (ret != LDB_SUCCESS) {
2235                         talloc_free(tmp_ctx);
2236                         ldb_module_oom(module);
2237                         return ret;
2238                 }
2239         }
2240
2241         /* work out what the new rdn value is, for updating the
2242            rDN and name fields */
2243         new_rdn_value = ldb_dn_get_rdn_val(new_dn);
2244         ret = ldb_msg_add_value(msg, rdn_name, new_rdn_value, &el);
2245         if (ret != LDB_SUCCESS) {
2246                 talloc_free(tmp_ctx);
2247                 return ret;
2248         }
2249         el->flags = LDB_FLAG_MOD_REPLACE;
2250
2251         el = ldb_msg_find_element(old_msg, "name");
2252         if (el) {
2253                 ret = ldb_msg_add_value(msg, "name", new_rdn_value, &el);
2254                 if (ret != LDB_SUCCESS) {
2255                         talloc_free(tmp_ctx);
2256                         return ret;
2257                 }
2258                 el->flags = LDB_FLAG_MOD_REPLACE;
2259         }
2260
2261         ret = dsdb_module_modify(module, msg, 0);
2262         if (ret != LDB_SUCCESS){
2263                 ldb_asprintf_errstring(ldb, "replmd_delete: Failed to modify object %s in delete - %s",
2264                                        ldb_dn_get_linearized(old_dn), ldb_errstring(ldb));
2265                 talloc_free(tmp_ctx);
2266                 return ret;
2267         }
2268
2269         /* now rename onto the new DN */
2270         ret = dsdb_module_rename(module, old_dn, new_dn, 0);
2271         if (ret != LDB_SUCCESS){
2272                 DEBUG(0,(__location__ ": Failed to rename object from '%s' to '%s' - %s\n",
2273                          ldb_dn_get_linearized(old_dn),
2274                          ldb_dn_get_linearized(new_dn),
2275                          ldb_errstring(ldb)));
2276                 talloc_free(tmp_ctx);
2277                 return ret;
2278         }
2279
2280         talloc_free(tmp_ctx);
2281
2282         return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
2283 }
2284
2285
2286
2287 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
2288 {
2289         return ret;
2290 }
2291
2292 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
2293 {
2294         int ret = LDB_ERR_OTHER;
2295         /* TODO: do some error mapping */
2296         return ret;
2297 }
2298
2299 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
2300 {
2301         struct ldb_context *ldb;
2302         struct ldb_request *change_req;
2303         enum ndr_err_code ndr_err;
2304         struct ldb_message *msg;
2305         struct replPropertyMetaDataBlob *md;
2306         struct ldb_val md_value;
2307         uint32_t i;
2308         int ret;
2309
2310         /*
2311          * TODO: check if the parent object exist
2312          */
2313
2314         /*
2315          * TODO: handle the conflict case where an object with the
2316          *       same name exist
2317          */
2318
2319         ldb = ldb_module_get_ctx(ar->module);
2320         msg = ar->objs->objects[ar->index_current].msg;
2321         md = ar->objs->objects[ar->index_current].meta_data;
2322
2323         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ar->seq_num);
2324         if (ret != LDB_SUCCESS) {
2325                 return replmd_replicated_request_error(ar, ret);
2326         }
2327
2328         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
2329         if (ret != LDB_SUCCESS) {
2330                 return replmd_replicated_request_error(ar, ret);
2331         }
2332
2333         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
2334         if (ret != LDB_SUCCESS) {
2335                 return replmd_replicated_request_error(ar, ret);
2336         }
2337
2338         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", ar->seq_num);
2339         if (ret != LDB_SUCCESS) {
2340                 return replmd_replicated_request_error(ar, ret);
2341         }
2342
2343         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ar->seq_num);
2344         if (ret != LDB_SUCCESS) {
2345                 return replmd_replicated_request_error(ar, ret);
2346         }
2347
2348         /* remove any message elements that have zero values */
2349         for (i=0; i<msg->num_elements; i++) {
2350                 if (msg->elements[i].num_values == 0) {
2351                         DEBUG(4,(__location__ ": Removing attribute %s with num_values==0\n",
2352                                  msg->elements[i].name));
2353                         memmove(&msg->elements[i], 
2354                                 &msg->elements[i+1], 
2355                                 sizeof(msg->elements[i])*(msg->num_elements - (i+1)));
2356                         msg->num_elements--;
2357                         i--;
2358                 }
2359         }
2360         
2361         /*
2362          * the meta data array is already sorted by the caller
2363          */
2364         for (i=0; i < md->ctr.ctr1.count; i++) {
2365                 md->ctr.ctr1.array[i].local_usn = ar->seq_num;
2366         }
2367         ndr_err = ndr_push_struct_blob(&md_value, msg, 
2368                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
2369                                        md,
2370                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
2371         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
2372                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
2373                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
2374         }
2375         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
2376         if (ret != LDB_SUCCESS) {
2377                 return replmd_replicated_request_error(ar, ret);
2378         }
2379
2380         replmd_ldb_message_sort(msg, ar->schema);
2381
2382         if (DEBUGLVL(4)) {
2383                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
2384                 DEBUG(4, ("DRS replication add message:\n%s\n", s));
2385                 talloc_free(s);
2386         }
2387
2388         ret = ldb_build_add_req(&change_req,
2389                                 ldb,
2390                                 ar,
2391                                 msg,
2392                                 ar->controls,
2393                                 ar,
2394                                 replmd_op_callback,
2395                                 ar->req);
2396         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
2397
2398         return ldb_next_request(ar->module, change_req);
2399 }
2400
2401 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
2402                                                          struct replPropertyMetaData1 *m2)
2403 {
2404         int ret;
2405
2406         if (m1->version != m2->version) {
2407                 return m1->version - m2->version;
2408         }
2409
2410         if (m1->originating_change_time != m2->originating_change_time) {
2411                 return m1->originating_change_time - m2->originating_change_time;
2412         }
2413
2414         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
2415         if (ret != 0) {
2416                 return ret;
2417         }
2418
2419         return m1->originating_usn - m2->originating_usn;
2420 }
2421
2422 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
2423 {
2424         struct ldb_context *ldb;
2425         struct ldb_request *change_req;
2426         enum ndr_err_code ndr_err;
2427         struct ldb_message *msg;
2428         struct replPropertyMetaDataBlob *rmd;
2429         struct replPropertyMetaDataBlob omd;
2430         const struct ldb_val *omd_value;
2431         struct replPropertyMetaDataBlob nmd;
2432         struct ldb_val nmd_value;
2433         uint32_t i,j,ni=0;
2434         uint32_t removed_attrs = 0;
2435         int ret;
2436
2437         ldb = ldb_module_get_ctx(ar->module);
2438         msg = ar->objs->objects[ar->index_current].msg;
2439         rmd = ar->objs->objects[ar->index_current].meta_data;
2440         ZERO_STRUCT(omd);
2441         omd.version = 1;
2442
2443         /*
2444          * TODO: check repl data is correct after a rename
2445          */
2446         if (ldb_dn_compare(msg->dn, ar->search_msg->dn) != 0) {
2447                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_request rename %s => %s\n",
2448                           ldb_dn_get_linearized(ar->search_msg->dn),
2449                           ldb_dn_get_linearized(msg->dn));
2450                 /* we can't use dsdb_module_rename() here as we need
2451                    the rename call to be intercepted by this module, to
2452                    allow it to process linked attribute changes */
2453                 if (ldb_rename(ldb, ar->search_msg->dn, msg->dn) != LDB_SUCCESS) {
2454                         ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_replicated_request rename %s => %s failed - %s\n",
2455                                   ldb_dn_get_linearized(ar->search_msg->dn),
2456                                   ldb_dn_get_linearized(msg->dn),
2457                                   ldb_errstring(ldb));
2458                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
2459                 }
2460         }
2461
2462         /* find existing meta data */
2463         omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
2464         if (omd_value) {
2465                 ndr_err = ndr_pull_struct_blob(omd_value, ar,
2466                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
2467                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
2468                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
2469                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
2470                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
2471                 }
2472
2473                 if (omd.version != 1) {
2474                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
2475                 }
2476         }
2477
2478         ZERO_STRUCT(nmd);
2479         nmd.version = 1;
2480         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
2481         nmd.ctr.ctr1.array = talloc_array(ar,
2482                                           struct replPropertyMetaData1,
2483                                           nmd.ctr.ctr1.count);
2484         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
2485
2486         /* first copy the old meta data */
2487         for (i=0; i < omd.ctr.ctr1.count; i++) {
2488                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
2489                 ni++;
2490         }
2491
2492         /* now merge in the new meta data */
2493         for (i=0; i < rmd->ctr.ctr1.count; i++) {
2494                 bool found = false;
2495
2496                 for (j=0; j < ni; j++) {
2497                         int cmp;
2498
2499                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
2500                                 continue;
2501                         }
2502
2503                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
2504                                                                             &nmd.ctr.ctr1.array[j]);
2505                         if (cmp > 0) {
2506                                 /* replace the entry */
2507                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
2508                                 found = true;
2509                                 break;
2510                         }
2511
2512                         /* we don't want to apply this change so remove the attribute */
2513                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
2514                         removed_attrs++;
2515
2516                         found = true;
2517                         break;
2518                 }
2519
2520                 if (found) continue;
2521
2522                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
2523                 ni++;
2524         }
2525
2526         /*
2527          * finally correct the size of the meta_data array
2528          */
2529         nmd.ctr.ctr1.count = ni;
2530
2531         /*
2532          * the rdn attribute (the alias for the name attribute),
2533          * 'cn' for most objects is the last entry in the meta data array
2534          * we have stored
2535          *
2536          * sort the new meta data array
2537          */
2538         ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, ar->schema, msg->dn);
2539         if (ret != LDB_SUCCESS) {
2540                 return ret;
2541         }
2542
2543         /*
2544          * check if some replicated attributes left, otherwise skip the ldb_modify() call
2545          */
2546         if (msg->num_elements == 0) {
2547                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
2548                           ar->index_current);
2549
2550                 ar->index_current++;
2551                 return replmd_replicated_apply_next(ar);
2552         }
2553
2554         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
2555                   ar->index_current, msg->num_elements);
2556
2557         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ar->seq_num);
2558         if (ret != LDB_SUCCESS) {
2559                 return replmd_replicated_request_error(ar, ret);
2560         }
2561
2562         for (i=0; i<ni; i++) {
2563                 nmd.ctr.ctr1.array[i].local_usn = ar->seq_num;
2564         }
2565
2566         /* create the meta data value */
2567         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
2568                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
2569                                        &nmd,
2570                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
2571         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
2572                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
2573                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
2574         }
2575
2576         /*
2577          * when we know that we'll modify the record, add the whenChanged, uSNChanged
2578          * and replPopertyMetaData attributes
2579          */
2580         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
2581         if (ret != LDB_SUCCESS) {
2582                 return replmd_replicated_request_error(ar, ret);
2583         }
2584         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ar->seq_num);
2585         if (ret != LDB_SUCCESS) {
2586                 return replmd_replicated_request_error(ar, ret);
2587         }
2588         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
2589         if (ret != LDB_SUCCESS) {
2590                 return replmd_replicated_request_error(ar, ret);
2591         }
2592
2593         replmd_ldb_message_sort(msg, ar->schema);
2594
2595         /* we want to replace the old values */
2596         for (i=0; i < msg->num_elements; i++) {
2597                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
2598         }
2599
2600         if (DEBUGLVL(4)) {
2601                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
2602                 DEBUG(4, ("DRS replication modify message:\n%s\n", s));
2603                 talloc_free(s);
2604         }
2605
2606         ret = ldb_build_mod_req(&change_req,
2607                                 ldb,
2608                                 ar,
2609                                 msg,
2610                                 ar->controls,
2611                                 ar,
2612                                 replmd_op_callback,
2613                                 ar->req);
2614         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
2615
2616         return ldb_next_request(ar->module, change_req);
2617 }
2618
2619 static int replmd_replicated_apply_search_callback(struct ldb_request *req,
2620                                                    struct ldb_reply *ares)
2621 {
2622         struct replmd_replicated_request *ar = talloc_get_type(req->context,
2623                                                struct replmd_replicated_request);
2624         int ret;
2625
2626         if (!ares) {
2627                 return ldb_module_done(ar->req, NULL, NULL,
2628                                         LDB_ERR_OPERATIONS_ERROR);
2629         }
2630         if (ares->error != LDB_SUCCESS &&
2631             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
2632                 return ldb_module_done(ar->req, ares->controls,
2633                                         ares->response, ares->error);
2634         }
2635
2636         switch (ares->type) {
2637         case LDB_REPLY_ENTRY:
2638                 ar->search_msg = talloc_steal(ar, ares->message);
2639                 break;
2640
2641         case LDB_REPLY_REFERRAL:
2642                 /* we ignore referrals */
2643                 break;
2644
2645         case LDB_REPLY_DONE:
2646                 if (ar->search_msg != NULL) {
2647                         ret = replmd_replicated_apply_merge(ar);
2648                 } else {
2649                         ret = replmd_replicated_apply_add(ar);
2650                 }
2651                 if (ret != LDB_SUCCESS) {
2652                         return ldb_module_done(ar->req, NULL, NULL, ret);
2653                 }
2654         }
2655
2656         talloc_free(ares);
2657         return LDB_SUCCESS;
2658 }
2659
2660 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
2661
2662 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
2663 {
2664         struct ldb_context *ldb;
2665         int ret;
2666         char *tmp_str;
2667         char *filter;
2668         struct ldb_request *search_req;
2669         struct ldb_search_options_control *options;
2670
2671         if (ar->index_current >= ar->objs->num_objects) {
2672                 /* done with it, go to next stage */
2673                 return replmd_replicated_uptodate_vector(ar);
2674         }
2675
2676         ldb = ldb_module_get_ctx(ar->module);
2677         ar->search_msg = NULL;
2678
2679         tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].guid_value);
2680         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
2681
2682         filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
2683         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
2684         talloc_free(tmp_str);
2685
2686         ret = ldb_build_search_req(&search_req,
2687                                    ldb,
2688                                    ar,
2689                                    NULL,
2690                                    LDB_SCOPE_SUBTREE,
2691                                    filter,
2692                                    NULL,
2693                                    NULL,
2694                                    ar,
2695                                    replmd_replicated_apply_search_callback,
2696                                    ar->req);
2697
2698         ret = ldb_request_add_control(search_req, LDB_CONTROL_SHOW_DELETED_OID, true, NULL);
2699         if (ret != LDB_SUCCESS) {
2700                 return ret;
2701         }
2702
2703         /* we need to cope with cross-partition links, so search for
2704            the GUID over all partitions */
2705         options = talloc(search_req, struct ldb_search_options_control);
2706         if (options == NULL) {
2707                 DEBUG(0, (__location__ ": out of memory\n"));
2708                 return LDB_ERR_OPERATIONS_ERROR;
2709         }
2710         options->search_options = LDB_SEARCH_OPTION_PHANTOM_ROOT;
2711
2712         ret = ldb_request_add_control(search_req,
2713                                       LDB_CONTROL_SEARCH_OPTIONS_OID,
2714                                       true, options);
2715         if (ret != LDB_SUCCESS) {
2716                 return ret;
2717         }
2718
2719         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
2720
2721         return ldb_next_request(ar->module, search_req);
2722 }
2723
2724 static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
2725                                                       struct ldb_reply *ares)
2726 {
2727         struct ldb_context *ldb;
2728         struct replmd_replicated_request *ar = talloc_get_type(req->context,
2729                                                struct replmd_replicated_request);
2730         ldb = ldb_module_get_ctx(ar->module);
2731
2732         if (!ares) {
2733                 return ldb_module_done(ar->req, NULL, NULL,
2734                                         LDB_ERR_OPERATIONS_ERROR);
2735         }
2736         if (ares->error != LDB_SUCCESS) {
2737                 return ldb_module_done(ar->req, ares->controls,
2738                                         ares->response, ares->error);
2739         }
2740
2741         if (ares->type != LDB_REPLY_DONE) {
2742                 ldb_set_errstring(ldb, "Invalid reply type\n!");
2743                 return ldb_module_done(ar->req, NULL, NULL,
2744                                         LDB_ERR_OPERATIONS_ERROR);
2745         }
2746
2747         talloc_free(ares);
2748
2749         return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
2750 }
2751
2752 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
2753 {
2754         struct ldb_context *ldb;
2755         struct ldb_request *change_req;
2756         enum ndr_err_code ndr_err;
2757         struct ldb_message *msg;
2758         struct replUpToDateVectorBlob ouv;
2759         const struct ldb_val *ouv_value;
2760         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
2761         struct replUpToDateVectorBlob nuv;
2762         struct ldb_val nuv_value;
2763         struct ldb_message_element *nuv_el = NULL;
2764         const struct GUID *our_invocation_id;
2765         struct ldb_message_element *orf_el = NULL;
2766         struct repsFromToBlob nrf;
2767         struct ldb_val *nrf_value = NULL;
2768         struct ldb_message_element *nrf_el = NULL;
2769         uint32_t i,j,ni=0;
2770         bool found = false;
2771         time_t t = time(NULL);
2772         NTTIME now;
2773         int ret;
2774
2775         ldb = ldb_module_get_ctx(ar->module);
2776         ruv = ar->objs->uptodateness_vector;
2777         ZERO_STRUCT(ouv);
2778         ouv.version = 2;
2779         ZERO_STRUCT(nuv);
2780         nuv.version = 2;
2781
2782         unix_to_nt_time(&now, t);
2783
2784         /*
2785          * first create the new replUpToDateVector
2786          */
2787         ouv_value = ldb_msg_find_ldb_val(ar->search_msg, "replUpToDateVector");
2788         if (ouv_value) {
2789                 ndr_err = ndr_pull_struct_blob(ouv_value, ar,
2790                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &ouv,
2791                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
2792                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
2793                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
2794                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
2795                 }
2796
2797                 if (ouv.version != 2) {
2798                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
2799                 }
2800         }
2801
2802         /*
2803          * the new uptodateness vector will at least
2804          * contain 1 entry, one for the source_dsa
2805          *
2806          * plus optional values from our old vector and the one from the source_dsa
2807          */
2808         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
2809         if (ruv) nuv.ctr.ctr2.count += ruv->count;
2810         nuv.ctr.ctr2.cursors = talloc_array(ar,
2811                                             struct drsuapi_DsReplicaCursor2,
2812                                             nuv.ctr.ctr2.count);
2813         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
2814
2815         /* first copy the old vector */
2816         for (i=0; i < ouv.ctr.ctr2.count; i++) {
2817                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
2818                 ni++;
2819         }
2820
2821         /* get our invocation_id if we have one already attached to the ldb */
2822         our_invocation_id = samdb_ntds_invocation_id(ldb);
2823
2824         /* merge in the source_dsa vector is available */
2825         for (i=0; (ruv && i < ruv->count); i++) {
2826                 found = false;
2827
2828                 if (our_invocation_id &&
2829                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
2830                                our_invocation_id)) {
2831                         continue;
2832                 }
2833
2834                 for (j=0; j < ni; j++) {
2835                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
2836                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
2837                                 continue;
2838                         }
2839
2840                         found = true;
2841
2842                         /*
2843                          * we update only the highest_usn and not the latest_sync_success time,
2844                          * because the last success stands for direct replication
2845                          */
2846                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
2847                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
2848                         }
2849                         break;                  
2850                 }
2851
2852                 if (found) continue;
2853
2854                 /* if it's not there yet, add it */
2855                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
2856                 ni++;
2857         }
2858
2859         /*
2860          * merge in the current highwatermark for the source_dsa
2861          */
2862         found = false;
2863         for (j=0; j < ni; j++) {
2864                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
2865                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
2866                         continue;
2867                 }
2868
2869                 found = true;
2870
2871                 /*
2872                  * here we update the highest_usn and last_sync_success time
2873                  * because we're directly replicating from the source_dsa
2874                  *
2875                  * and use the tmp_highest_usn because this is what we have just applied
2876                  * to our ldb
2877                  */
2878                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
2879                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
2880                 break;
2881         }
2882         if (!found) {
2883                 /*
2884                  * here we update the highest_usn and last_sync_success time
2885                  * because we're directly replicating from the source_dsa
2886                  *
2887                  * and use the tmp_highest_usn because this is what we have just applied
2888                  * to our ldb
2889                  */
2890                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
2891                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
2892                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
2893                 ni++;
2894         }
2895
2896         /*
2897          * finally correct the size of the cursors array
2898          */
2899         nuv.ctr.ctr2.count = ni;
2900
2901         /*
2902          * sort the cursors
2903          */
2904         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
2905               sizeof(struct drsuapi_DsReplicaCursor2),
2906               (comparison_fn_t)drsuapi_DsReplicaCursor2_compare);
2907
2908         /*
2909          * create the change ldb_message
2910          */
2911         msg = ldb_msg_new(ar);
2912         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
2913         msg->dn = ar->search_msg->dn;
2914
2915         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
2916                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
2917                                        &nuv,
2918                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
2919         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
2920                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
2921                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
2922         }
2923         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
2924         if (ret != LDB_SUCCESS) {
2925                 return replmd_replicated_request_error(ar, ret);
2926         }
2927         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
2928
2929         /*
2930          * now create the new repsFrom value from the given repsFromTo1 structure
2931          */
2932         ZERO_STRUCT(nrf);
2933         nrf.version                                     = 1;
2934         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
2935         /* and fix some values... */
2936         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
2937         nrf.ctr.ctr1.last_success                       = now;
2938         nrf.ctr.ctr1.last_attempt                       = now;
2939         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
2940         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
2941
2942         /*
2943          * first see if we already have a repsFrom value for the current source dsa
2944          * if so we'll later replace this value
2945          */
2946         orf_el = ldb_msg_find_element(ar->search_msg, "repsFrom");
2947         if (orf_el) {
2948                 for (i=0; i < orf_el->num_values; i++) {
2949                         struct repsFromToBlob *trf;
2950
2951                         trf = talloc(ar, struct repsFromToBlob);
2952                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
2953
2954                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), trf,
2955                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
2956                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
2957                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
2958                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
2959                         }
2960
2961                         if (trf->version != 1) {
2962                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
2963                         }
2964
2965                         /*
2966                          * we compare the source dsa objectGUID not the invocation_id
2967                          * because we want only one repsFrom value per source dsa
2968                          * and when the invocation_id of the source dsa has changed we don't need 
2969                          * the old repsFrom with the old invocation_id
2970                          */
2971                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
2972                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
2973                                 talloc_free(trf);
2974                                 continue;
2975                         }
2976
2977                         talloc_free(trf);
2978                         nrf_value = &orf_el->values[i];
2979                         break;
2980                 }
2981
2982                 /*
2983                  * copy over all old values to the new ldb_message
2984                  */
2985                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
2986                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
2987                 *nrf_el = *orf_el;
2988         }
2989
2990         /*
2991          * if we haven't found an old repsFrom value for the current source dsa
2992          * we'll add a new value
2993          */
2994         if (!nrf_value) {
2995                 struct ldb_val zero_value;
2996                 ZERO_STRUCT(zero_value);
2997                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
2998                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
2999
3000                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
3001         }
3002
3003         /* we now fill the value which is already attached to ldb_message */
3004         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
3005                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
3006                                        &nrf,
3007                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
3008         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
3009                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
3010                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
3011         }
3012
3013         /* 
3014          * the ldb_message_element for the attribute, has all the old values and the new one
3015          * so we'll replace the whole attribute with all values
3016          */
3017         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
3018
3019         if (DEBUGLVL(4)) {
3020                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
3021                 DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
3022                 talloc_free(s);
3023         }
3024
3025         /* prepare the ldb_modify() request */
3026         ret = ldb_build_mod_req(&change_req,
3027                                 ldb,
3028                                 ar,
3029                                 msg,
3030                                 ar->controls,
3031                                 ar,
3032                                 replmd_replicated_uptodate_modify_callback,
3033                                 ar->req);
3034         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
3035
3036         return ldb_next_request(ar->module, change_req);
3037 }
3038
3039 static int replmd_replicated_uptodate_search_callback(struct ldb_request *req,
3040                                                       struct ldb_reply *ares)
3041 {
3042         struct replmd_replicated_request *ar = talloc_get_type(req->context,
3043                                                struct replmd_replicated_request);
3044         int ret;
3045
3046         if (!ares) {
3047                 return ldb_module_done(ar->req, NULL, NULL,
3048                                         LDB_ERR_OPERATIONS_ERROR);
3049         }
3050         if (ares->error != LDB_SUCCESS &&
3051             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
3052                 return ldb_module_done(ar->req, ares->controls,
3053                                         ares->response, ares->error);
3054         }
3055
3056         switch (ares->type) {
3057         case LDB_REPLY_ENTRY:
3058                 ar->search_msg = talloc_steal(ar, ares->message);
3059                 break;
3060
3061         case LDB_REPLY_REFERRAL:
3062                 /* we ignore referrals */
3063                 break;
3064
3065         case LDB_REPLY_DONE:
3066                 if (ar->search_msg == NULL) {
3067                         ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
3068                 } else {
3069                         ret = replmd_replicated_uptodate_modify(ar);
3070                 }
3071                 if (ret != LDB_SUCCESS) {
3072                         return ldb_module_done(ar->req, NULL, NULL, ret);
3073                 }
3074         }
3075
3076         talloc_free(ares);
3077         return LDB_SUCCESS;
3078 }
3079
3080
3081 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
3082 {
3083         struct ldb_context *ldb;
3084         int ret;
3085         static const char *attrs[] = {
3086                 "replUpToDateVector",
3087                 "repsFrom",
3088                 NULL
3089         };
3090         struct ldb_request *search_req;
3091
3092         ldb = ldb_module_get_ctx(ar->module);
3093         ar->search_msg = NULL;
3094
3095         ret = ldb_build_search_req(&search_req,
3096                                    ldb,
3097                                    ar,
3098                                    ar->objs->partition_dn,
3099                                    LDB_SCOPE_BASE,
3100                                    "(objectClass=*)",
3101                                    attrs,
3102                                    NULL,
3103                                    ar,
3104                                    replmd_replicated_uptodate_search_callback,
3105                                    ar->req);
3106         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
3107
3108         return ldb_next_request(ar->module, search_req);
3109 }
3110
3111
3112
3113 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
3114 {
3115         struct ldb_context *ldb;
3116         struct dsdb_extended_replicated_objects *objs;
3117         struct replmd_replicated_request *ar;
3118         struct ldb_control **ctrls;
3119         int ret, i;
3120         struct replmd_private *replmd_private = 
3121                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
3122
3123         ldb = ldb_module_get_ctx(module);
3124
3125         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
3126
3127         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
3128         if (!objs) {
3129                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
3130                 return LDB_ERR_PROTOCOL_ERROR;
3131         }
3132
3133         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
3134                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
3135                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
3136                 return LDB_ERR_PROTOCOL_ERROR;
3137         }
3138
3139         ar = replmd_ctx_init(module, req);
3140         if (!ar)
3141                 return LDB_ERR_OPERATIONS_ERROR;
3142
3143         /* Set the flags to have the replmd_op_callback run over the full set of objects */
3144         ar->apply_mode = true;
3145         ar->objs = objs;
3146         ar->schema = dsdb_get_schema(ldb);
3147         if (!ar->schema) {
3148                 ldb_debug_set(ldb, LDB_DEBUG_FATAL, "replmd_ctx_init: no loaded schema found\n");
3149                 talloc_free(ar);
3150                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
3151                 return LDB_ERR_CONSTRAINT_VIOLATION;
3152         }
3153
3154         ctrls = req->controls;
3155
3156         if (req->controls) {
3157                 req->controls = talloc_memdup(ar, req->controls,
3158                                               talloc_get_size(req->controls));
3159                 if (!req->controls) return replmd_replicated_request_werror(ar, WERR_NOMEM);
3160         }
3161
3162         ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, NULL);
3163         if (ret != LDB_SUCCESS) {
3164                 return ret;
3165         }
3166
3167         ar->controls = req->controls;
3168         req->controls = ctrls;
3169
3170         DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
3171
3172         /* save away the linked attributes for the end of the
3173            transaction */
3174         for (i=0; i<ar->objs->linked_attributes_count; i++) {
3175                 struct la_entry *la_entry;
3176
3177                 if (replmd_private->la_ctx == NULL) {
3178                         replmd_private->la_ctx = talloc_new(replmd_private);
3179                 }
3180                 la_entry = talloc(replmd_private->la_ctx, struct la_entry);
3181                 if (la_entry == NULL) {
3182                         ldb_oom(ldb);
3183                         return LDB_ERR_OPERATIONS_ERROR;
3184                 }
3185                 la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
3186                 if (la_entry->la == NULL) {
3187                         talloc_free(la_entry);
3188                         ldb_oom(ldb);
3189                         return LDB_ERR_OPERATIONS_ERROR;
3190                 }
3191                 *la_entry->la = ar->objs->linked_attributes[i];
3192
3193                 /* we need to steal the non-scalars so they stay
3194                    around until the end of the transaction */
3195                 talloc_steal(la_entry->la, la_entry->la->identifier);
3196                 talloc_steal(la_entry->la, la_entry->la->value.blob);
3197
3198                 DLIST_ADD(replmd_private->la_list, la_entry);
3199         }
3200
3201         return replmd_replicated_apply_next(ar);
3202 }
3203
3204 /*
3205   process one linked attribute structure
3206  */
3207 static int replmd_process_linked_attribute(struct ldb_module *module,
3208                                            struct la_entry *la_entry)
3209 {                                          
3210         struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
3211         struct ldb_context *ldb = ldb_module_get_ctx(module);
3212         struct dsdb_schema *schema = dsdb_get_schema(ldb);
3213         struct drsuapi_DsReplicaObjectIdentifier3 target;
3214         struct ldb_message *msg;
3215         TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
3216         struct ldb_request *mod_req;
3217         int ret;
3218         const struct dsdb_attribute *attr;
3219         struct ldb_dn *target_dn;
3220         struct dsdb_dn *dsdb_dn;
3221         uint64_t seq_num = 0;
3222         struct drsuapi_DsReplicaAttribute drs;
3223         struct drsuapi_DsAttributeValue val;
3224         struct ldb_message_element el;
3225         const struct ldb_val *guid;
3226         WERROR status;
3227         time_t t = time(NULL);
3228
3229         drs.value_ctr.num_values = 1;
3230         drs.value_ctr.values = &val;
3231         val.blob = la->value.blob;
3232
3233 /*
3234 linked_attributes[0]:                                                     
3235      &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute 
3236         identifier               : *                                      
3237             identifier: struct drsuapi_DsReplicaObjectIdentifier          
3238                 __ndr_size               : 0x0000003a (58)                
3239                 __ndr_size_sid           : 0x00000000 (0)                 
3240                 guid                     : 8e95b6a9-13dd-4158-89db-3220a5be5cc7
3241                 sid                      : S-0-0                               
3242                 __ndr_size_dn            : 0x00000000 (0)                      
3243                 dn                       : ''                                  
3244         attid                    : DRSUAPI_ATTRIBUTE_member (0x1F)             
3245         value: struct drsuapi_DsAttributeValue                                 
3246             __ndr_size               : 0x0000007e (126)                        
3247             blob                     : *                                       
3248                 blob                     : DATA_BLOB length=126                
3249         flags                    : 0x00000001 (1)                              
3250                1: DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE                      
3251         originating_add_time     : Wed Sep  2 22:20:01 2009 EST                
3252         meta_data: struct drsuapi_DsReplicaMetaData                            
3253             version                  : 0x00000015 (21)                         
3254             originating_change_time  : Wed Sep  2 23:39:07 2009 EST            
3255             originating_invocation_id: 794640f3-18cf-40ee-a211-a93992b67a64    
3256             originating_usn          : 0x000000000001e19c (123292)             
3257
3258 (for cases where the link is to a normal DN)
3259      &target: struct drsuapi_DsReplicaObjectIdentifier3                        
3260         __ndr_size               : 0x0000007e (126)                            
3261         __ndr_size_sid           : 0x0000001c (28)                             
3262         guid                     : 7639e594-db75-4086-b0d4-67890ae46031        
3263         sid                      : S-1-5-21-2848215498-2472035911-1947525656-19924
3264         __ndr_size_dn            : 0x00000022 (34)                                
3265         dn                       : 'CN=UOne,OU=TestOU,DC=vsofs8,DC=com'           
3266  */
3267         
3268         /* find the attribute being modified */
3269         attr = dsdb_attribute_by_attributeID_id(schema, la->attid);
3270         if (attr == NULL) {
3271                 DEBUG(0, (__location__ ": Unable to find attributeID 0x%x\n", la->attid));
3272                 talloc_free(tmp_ctx);
3273                 return LDB_ERR_OPERATIONS_ERROR;
3274         }
3275
3276         status = attr->syntax->drsuapi_to_ldb(ldb, schema, attr, &drs, tmp_ctx, &el);
3277
3278         /* construct a modify request for this attribute change */
3279         msg = ldb_msg_new(tmp_ctx);
3280         if (!msg) {
3281                 ldb_oom(ldb);
3282                 talloc_free(tmp_ctx);
3283                 return LDB_ERR_OPERATIONS_ERROR;
3284         }
3285
3286         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, 
3287                                    GUID_string(tmp_ctx, &la->identifier->guid), &msg->dn);
3288         if (ret != LDB_SUCCESS) {
3289                 talloc_free(tmp_ctx);
3290                 return ret;
3291         }
3292
3293         el.name = attr->lDAPDisplayName;
3294         if (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) {
3295                 ret = ldb_msg_add(msg, &el, LDB_FLAG_MOD_ADD);
3296         } else {
3297                 ret = ldb_msg_add(msg, &el, LDB_FLAG_MOD_DELETE);
3298         }
3299         if (ret != LDB_SUCCESS) {
3300                 talloc_free(tmp_ctx);
3301                 return ret;
3302         }
3303
3304         dsdb_dn = dsdb_dn_parse(tmp_ctx, ldb, &el.values[0], attr->syntax->ldap_oid);
3305         if (!dsdb_dn) {
3306                 DEBUG(0,(__location__ ": Failed to parse just-generated DN\n"));
3307                 talloc_free(tmp_ctx);
3308                 return LDB_ERR_INVALID_DN_SYNTAX;
3309         }
3310
3311         guid = ldb_dn_get_extended_component(dsdb_dn->dn, "GUID");
3312         if (!guid) {
3313                 DEBUG(0,(__location__ ": Failed to parse GUID from just-generated DN\n"));
3314                 talloc_free(tmp_ctx);
3315                 return LDB_ERR_INVALID_DN_SYNTAX;
3316         }
3317
3318         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, ldb_binary_encode(tmp_ctx, *guid), &target_dn);
3319         if (ret != LDB_SUCCESS) {
3320                 /* If this proves to be a problem in the future, then
3321                  * just remove the return - perhaps we can just use
3322                  * the details the replication peer supplied */
3323
3324                 DEBUG(0,(__location__ ": Failed to map GUID %s to DN\n", GUID_string(tmp_ctx, &target.guid)));
3325                 talloc_free(tmp_ctx);
3326                 return LDB_ERR_OPERATIONS_ERROR;
3327         } else {
3328
3329                 /* Now update with full DN we just found in the DB (including extended components) */
3330                 dsdb_dn->dn = target_dn;
3331                 /* Now make a linearized version, using the original binary components (if any) */
3332                 el.values[0] = data_blob_string_const(dsdb_dn_get_extended_linearized(tmp_ctx, dsdb_dn, 1));
3333         }
3334
3335         ret = replmd_update_rpmd(module, schema, msg, &seq_num, t);
3336         if (ret != LDB_SUCCESS) {
3337                 talloc_free(tmp_ctx);
3338                 return ret;
3339         }
3340
3341         /* we only change whenChanged and uSNChanged if the seq_num
3342            has changed */
3343         if (seq_num != 0) {
3344                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
3345                         talloc_free(tmp_ctx);
3346                         return LDB_ERR_OPERATIONS_ERROR;
3347                 }
3348
3349                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
3350                         talloc_free(tmp_ctx);
3351                         return LDB_ERR_OPERATIONS_ERROR;
3352                 }
3353         }
3354
3355         ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
3356                                 msg,
3357                                 NULL,
3358                                 NULL, 
3359                                 ldb_op_default_callback,
3360                                 NULL);
3361         if (ret != LDB_SUCCESS) {
3362                 talloc_free(tmp_ctx);
3363                 return ret;
3364         }
3365         talloc_steal(mod_req, msg);
3366
3367         if (DEBUGLVL(4)) {
3368                 DEBUG(4,("Applying DRS linked attribute change:\n%s\n",
3369                          ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg)));
3370         }
3371
3372         /* Run the new request */
3373         ret = ldb_next_request(module, mod_req);
3374
3375         /* we need to wait for this to finish, as we are being called
3376            from the synchronous end_transaction hook of this module */
3377         if (ret == LDB_SUCCESS) {
3378                 ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
3379         }
3380
3381         if (ret == LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
3382                 /* the link destination exists, we need to update it
3383                  * by deleting the old one for the same DN then adding
3384                  * the new one */
3385                 msg->elements = talloc_realloc(msg, msg->elements,
3386                                                struct ldb_message_element,
3387                                                msg->num_elements+1);
3388                 if (msg->elements == NULL) {
3389                         ldb_oom(ldb);
3390                         talloc_free(tmp_ctx);
3391                         return LDB_ERR_OPERATIONS_ERROR;
3392                 }
3393                 /* this relies on the backend matching the old entry
3394                    only by the DN portion of the extended DN */
3395                 msg->elements[1] = msg->elements[0];
3396                 msg->elements[0].flags = LDB_FLAG_MOD_DELETE;
3397                 msg->num_elements++;
3398
3399                 ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
3400                                         msg,
3401                                         NULL,
3402                                         NULL, 
3403                                         ldb_op_default_callback,
3404                                         NULL);
3405                 if (ret != LDB_SUCCESS) {
3406                         talloc_free(tmp_ctx);
3407                         return ret;
3408                 }
3409
3410                 /* Run the new request */
3411                 ret = ldb_next_request(module, mod_req);
3412                 
3413                 if (ret == LDB_SUCCESS) {
3414                         ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
3415                 }
3416         }
3417
3418         if (ret != LDB_SUCCESS) {
3419                 ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s' %s\n",
3420                           ldb_errstring(ldb),
3421                           ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
3422                 ret = LDB_SUCCESS;
3423         }
3424         
3425         talloc_free(tmp_ctx);
3426
3427         return ret;     
3428 }
3429
3430 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
3431 {
3432         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
3433                 return replmd_extended_replicated_objects(module, req);
3434         }
3435
3436         return ldb_next_request(module, req);
3437 }
3438
3439
3440 /*
3441   we hook into the transaction operations to allow us to 
3442   perform the linked attribute updates at the end of the whole
3443   transaction. This allows a forward linked attribute to be created
3444   before the object is created. During a vampire, w2k8 sends us linked
3445   attributes before the objects they are part of.
3446  */
3447 static int replmd_start_transaction(struct ldb_module *module)
3448 {
3449         /* create our private structure for this transaction */
3450         struct replmd_private *replmd_private = talloc_get_type(ldb_module_get_private(module),
3451                                                                 struct replmd_private);
3452         replmd_txn_cleanup(replmd_private);
3453
3454         /* free any leftover mod_usn records from cancelled
3455            transactions */
3456         while (replmd_private->ncs) {
3457                 struct nc_entry *e = replmd_private->ncs;
3458                 DLIST_REMOVE(replmd_private->ncs, e);
3459                 talloc_free(e);
3460         }
3461
3462         return ldb_next_start_trans(module);
3463 }
3464
3465 /*
3466   on prepare commit we loop over our queued la_context structures and
3467   apply each of them  
3468  */
3469 static int replmd_prepare_commit(struct ldb_module *module)
3470 {
3471         struct replmd_private *replmd_private = 
3472                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
3473         struct la_entry *la, *prev;
3474         struct la_backlink *bl;
3475         int ret;
3476
3477         /* walk the list backwards, to do the first entry first, as we
3478          * added the entries with DLIST_ADD() which puts them at the
3479          * start of the list */
3480         for (la = replmd_private->la_list; la && la->next; la=la->next) ;
3481
3482         for (; la; la=prev) {
3483                 prev = la->prev;
3484                 DLIST_REMOVE(replmd_private->la_list, la);
3485                 ret = replmd_process_linked_attribute(module, la);
3486                 if (ret != LDB_SUCCESS) {
3487                         replmd_txn_cleanup(replmd_private);
3488                         return ret;
3489                 }
3490         }
3491
3492         /* process our backlink list, creating and deleting backlinks
3493            as necessary */
3494         for (bl=replmd_private->la_backlinks; bl; bl=bl->next) {
3495                 ret = replmd_process_backlink(module, bl);
3496                 if (ret != LDB_SUCCESS) {
3497                         replmd_txn_cleanup(replmd_private);
3498                         return ret;
3499                 }
3500         }
3501
3502         replmd_txn_cleanup(replmd_private);
3503
3504         /* possibly change @REPLCHANGED */
3505         ret = replmd_notify_store(module);
3506         if (ret != LDB_SUCCESS) {
3507                 return ret;
3508         }
3509         
3510         return ldb_next_prepare_commit(module);
3511 }
3512
3513 static int replmd_del_transaction(struct ldb_module *module)
3514 {
3515         struct replmd_private *replmd_private = 
3516                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
3517         replmd_txn_cleanup(replmd_private);
3518
3519         return ldb_next_del_trans(module);
3520 }
3521
3522
3523 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
3524         .name          = "repl_meta_data",
3525         .init_context      = replmd_init,
3526         .add               = replmd_add,
3527         .modify            = replmd_modify,
3528         .rename            = replmd_rename,
3529         .del               = replmd_delete,
3530         .extended          = replmd_extended,
3531         .start_transaction = replmd_start_transaction,
3532         .prepare_commit    = replmd_prepare_commit,
3533         .del_transaction   = replmd_del_transaction,
3534 };