s4-dsdb: added support for backlinks in repl_meta_data
[nivanova/samba-autobuild/.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /*
24  *  Name: ldb
25  *
26  *  Component: ldb repl_meta_data module
27  *
28  *  Description: - add a unique objectGUID onto every new record,
29  *               - handle whenCreated, whenChanged timestamps
30  *               - handle uSNCreated, uSNChanged numbers
31  *               - handle replPropertyMetaData attribute
32  *
33  *  Author: Simo Sorce
34  *  Author: Stefan Metzmacher
35  */
36
37 #include "includes.h"
38 #include "ldb_module.h"
39 #include "dsdb/samdb/samdb.h"
40 #include "dsdb/common/proto.h"
41 #include "../libds/common/flags.h"
42 #include "librpc/gen_ndr/ndr_misc.h"
43 #include "librpc/gen_ndr/ndr_drsuapi.h"
44 #include "librpc/gen_ndr/ndr_drsblobs.h"
45 #include "param/param.h"
46 #include "libcli/security/dom_sid.h"
47 #include "lib/util/dlinklist.h"
48 #include "dsdb/samdb/ldb_modules/util.h"
49 #include "lib/util/binsearch.h"
50
51 #define W2K3_LINKED_ATTRIBUTES 1
52
53 struct replmd_private {
54         TALLOC_CTX *la_ctx;
55         struct la_entry *la_list;
56         TALLOC_CTX *bl_ctx;
57         struct la_backlink *la_backlinks;
58         struct nc_entry {
59                 struct nc_entry *prev, *next;
60                 struct ldb_dn *dn;
61                 uint64_t mod_usn;
62         } *ncs;
63 };
64
65 struct la_entry {
66         struct la_entry *next, *prev;
67         struct drsuapi_DsReplicaLinkedAttribute *la;
68 };
69
70 struct replmd_replicated_request {
71         struct ldb_module *module;
72         struct ldb_request *req;
73
74         const struct dsdb_schema *schema;
75
76         /* the controls we pass down */
77         struct ldb_control **controls;
78
79         /* details for the mode where we apply a bunch of inbound replication meessages */
80         bool apply_mode;
81         uint32_t index_current;
82         struct dsdb_extended_replicated_objects *objs;
83
84         struct ldb_message *search_msg;
85
86         uint64_t seq_num;
87
88 };
89
90 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
91
92
93 /*
94   initialise the module
95   allocate the private structure and build the list
96   of partition DNs for use by replmd_notify()
97  */
98 static int replmd_init(struct ldb_module *module)
99 {
100         struct replmd_private *replmd_private;
101         struct ldb_context *ldb = ldb_module_get_ctx(module);
102
103         replmd_private = talloc_zero(module, struct replmd_private);
104         if (replmd_private == NULL) {
105                 ldb_oom(ldb);
106                 return LDB_ERR_OPERATIONS_ERROR;
107         }
108         ldb_module_set_private(module, replmd_private);
109
110         return ldb_next_init(module);
111 }
112
113 /*
114   cleanup our per-transaction contexts
115  */
116 static void replmd_txn_cleanup(struct replmd_private *replmd_private)
117 {
118         talloc_free(replmd_private->la_ctx);
119         replmd_private->la_list = NULL;
120         replmd_private->la_ctx = NULL;
121
122         talloc_free(replmd_private->bl_ctx);
123         replmd_private->la_backlinks = NULL;
124         replmd_private->bl_ctx = NULL;
125 }
126
127
128 struct la_backlink {
129         struct la_backlink *next, *prev;
130         const char *attr_name;
131         struct GUID forward_guid, target_guid;
132         bool active;
133 };
134
135 /*
136   add a backlink to the list of backlinks to add/delete in the prepare
137   commit
138  */
139 static int replmd_add_backlink(struct ldb_module *module, const struct dsdb_schema *schema,
140                                struct GUID *forward_guid, struct GUID *target_guid,
141                                bool active, const struct dsdb_attribute *schema_attr)
142 {
143         const struct dsdb_attribute *target_attr;
144         struct la_backlink *bl;
145         struct replmd_private *replmd_private =
146                 talloc_get_type_abort(ldb_module_get_private(module), struct replmd_private);
147
148         target_attr = dsdb_attribute_by_linkID(schema, schema_attr->linkID + 1);
149         if (!target_attr) {
150                 /*
151                  * windows 2003 has a broken schema where the
152                  * definition of msDS-IsDomainFor is missing (which is
153                  * supposed to be the backlink of the
154                  * msDS-HasDomainNCs attribute
155                  */
156                 return LDB_SUCCESS;
157         }
158
159         /* see if its already in the list */
160         for (bl=replmd_private->la_backlinks; bl; bl=bl->next) {
161                 if (GUID_equal(forward_guid, &bl->forward_guid) &&
162                     GUID_equal(target_guid, &bl->target_guid) &&
163                     (target_attr->lDAPDisplayName == bl->attr_name ||
164                      strcmp(target_attr->lDAPDisplayName, bl->attr_name) == 0)) {
165                         break;
166                 }
167         }
168
169         if (bl) {
170                 /* we found an existing one */
171                 if (bl->active == active) {
172                         return LDB_SUCCESS;
173                 }
174                 DLIST_REMOVE(replmd_private->la_backlinks, bl);
175                 talloc_free(bl);
176                 return LDB_SUCCESS;
177         }
178
179         if (replmd_private->bl_ctx == NULL) {
180                 replmd_private->bl_ctx = talloc_new(replmd_private);
181                 if (replmd_private->bl_ctx == NULL) {
182                         ldb_module_oom(module);
183                         return LDB_ERR_OPERATIONS_ERROR;
184                 }
185         }
186
187         /* its a new one */
188         bl = talloc(replmd_private->bl_ctx, struct la_backlink);
189         if (bl == NULL) {
190                 ldb_module_oom(module);
191                 return LDB_ERR_OPERATIONS_ERROR;
192         }
193
194         bl->attr_name = target_attr->lDAPDisplayName;
195         bl->forward_guid = *forward_guid;
196         bl->target_guid = *target_guid;
197         bl->active = active;
198
199         DLIST_ADD(replmd_private->la_backlinks, bl);
200
201         return LDB_SUCCESS;
202 }
203
204 /*
205   process the list of backlinks we accumulated during
206   a transaction, adding and deleting the backlinks
207   from the target objects
208  */
209 static int replmd_process_backlink(struct ldb_module *module, struct la_backlink *bl)
210 {
211         struct ldb_dn *target_dn, *source_dn;
212         int ret;
213         struct ldb_context *ldb = ldb_module_get_ctx(module);
214         struct ldb_message *msg;
215         TALLOC_CTX *tmp_ctx = talloc_new(bl);
216         char *dn_string;
217
218         /*
219           - find DN of target
220           - find DN of source
221           - construct ldb_message
222               - either an add or a delete
223          */
224         ret = dsdb_module_dn_by_guid(module, tmp_ctx, &bl->target_guid, &target_dn);
225         if (ret != LDB_SUCCESS) {
226                 ldb_asprintf_errstring(ldb, "Failed to find target DN for linked attribute with GUID %s\n",
227                                        GUID_string(bl, &bl->target_guid));
228                 talloc_free(tmp_ctx);
229                 return ret;
230         }
231
232         ret = dsdb_module_dn_by_guid(module, tmp_ctx, &bl->forward_guid, &source_dn);
233         if (ret != LDB_SUCCESS) {
234                 ldb_asprintf_errstring(ldb, "Failed to find source DN for linked attribute with GUID %s\n",
235                                        GUID_string(bl, &bl->forward_guid));
236                 talloc_free(tmp_ctx);
237                 return ret;
238         }
239
240         msg = ldb_msg_new(tmp_ctx);
241         if (msg == NULL) {
242                 ldb_module_oom(module);
243                 talloc_free(tmp_ctx);
244                 return LDB_ERR_OPERATIONS_ERROR;
245         }
246
247         /* construct a ldb_message for adding/deleting the backlink */
248         msg->dn = target_dn;
249         dn_string = ldb_dn_get_extended_linearized(tmp_ctx, source_dn, 1);
250         if (!dn_string) {
251                 ldb_module_oom(module);
252                 talloc_free(tmp_ctx);
253                 return LDB_ERR_OPERATIONS_ERROR;
254         }
255         ret = ldb_msg_add_steal_string(msg, bl->attr_name, dn_string);
256         if (ret != LDB_SUCCESS) {
257                 talloc_free(tmp_ctx);
258                 return ret;
259         }
260         msg->elements[0].flags = bl->active?LDB_FLAG_MOD_ADD:LDB_FLAG_MOD_DELETE;
261
262         ret = dsdb_module_modify(module, msg, 0);
263         if (ret != LDB_SUCCESS) {
264                 ldb_asprintf_errstring(ldb, "Failed to %s backlink from %s to %s - %s",
265                                        bl->active?"add":"remove",
266                                        ldb_dn_get_linearized(source_dn),
267                                        ldb_dn_get_linearized(target_dn),
268                                        ldb_errstring(ldb));
269                 talloc_free(tmp_ctx);
270                 return ret;
271         }
272         talloc_free(tmp_ctx);
273         return ret;
274 }
275
276
277 /*
278  * Callback for most write operations in this module:
279  * 
280  * notify the repl task that a object has changed. The notifies are
281  * gathered up in the replmd_private structure then written to the
282  * @REPLCHANGED object in each partition during the prepare_commit
283  */
284 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
285 {
286         int ret;
287         struct replmd_replicated_request *ac = 
288                 talloc_get_type_abort(req->context, struct replmd_replicated_request);
289         struct replmd_private *replmd_private = 
290                 talloc_get_type_abort(ldb_module_get_private(ac->module), struct replmd_private);
291         struct nc_entry *modified_partition;
292         struct ldb_control *partition_ctrl;
293         const struct dsdb_control_current_partition *partition;
294
295         struct ldb_control **controls;
296
297         partition_ctrl = ldb_reply_get_control(ares, DSDB_CONTROL_CURRENT_PARTITION_OID);
298
299         /* Remove the 'partition' control from what we pass up the chain */
300         controls = controls_except_specified(ares->controls, ares, partition_ctrl);
301
302         if (ares->error != LDB_SUCCESS) {
303                 return ldb_module_done(ac->req, controls,
304                                         ares->response, ares->error);
305         }
306
307         if (ares->type != LDB_REPLY_DONE) {
308                 ldb_set_errstring(ldb_module_get_ctx(ac->module), "Invalid reply type for notify\n!");
309                 return ldb_module_done(ac->req, NULL,
310                                        NULL, LDB_ERR_OPERATIONS_ERROR);
311         }
312
313         if (!partition_ctrl) {
314                 return ldb_module_done(ac->req, NULL,
315                                        NULL, LDB_ERR_OPERATIONS_ERROR);
316         }
317
318         partition = talloc_get_type_abort(partition_ctrl->data,
319                                     struct dsdb_control_current_partition);
320         
321         if (ac->seq_num > 0) {
322                 for (modified_partition = replmd_private->ncs; modified_partition; 
323                      modified_partition = modified_partition->next) {
324                         if (ldb_dn_compare(modified_partition->dn, partition->dn) == 0) {
325                                 break;
326                         }
327                 }
328                 
329                 if (modified_partition == NULL) {
330                         modified_partition = talloc_zero(replmd_private, struct nc_entry);
331                         if (!modified_partition) {
332                                 ldb_oom(ldb_module_get_ctx(ac->module));
333                                 return ldb_module_done(ac->req, NULL,
334                                                        NULL, LDB_ERR_OPERATIONS_ERROR);
335                         }
336                         modified_partition->dn = ldb_dn_copy(modified_partition, partition->dn);
337                         if (!modified_partition->dn) {
338                                 ldb_oom(ldb_module_get_ctx(ac->module));
339                                 return ldb_module_done(ac->req, NULL,
340                                                        NULL, LDB_ERR_OPERATIONS_ERROR);
341                         }
342                         DLIST_ADD(replmd_private->ncs, modified_partition);
343                 }
344
345                 if (ac->seq_num > modified_partition->mod_usn) {
346                         modified_partition->mod_usn = ac->seq_num;
347                 }
348         }
349
350         if (ac->apply_mode) {
351                 talloc_free(ares);
352                 ac->index_current++;
353                 
354                 ret = replmd_replicated_apply_next(ac);
355                 if (ret != LDB_SUCCESS) {
356                         return ldb_module_done(ac->req, NULL, NULL, ret);
357                 }
358                 return ret;
359         } else {
360                 /* free the partition control container here, for the
361                  * common path.  Other cases will have it cleaned up
362                  * eventually with the ares */
363                 talloc_free(partition_ctrl);
364                 return ldb_module_done(ac->req, 
365                                        controls_except_specified(controls, ares, partition_ctrl),
366                                        ares->response, LDB_SUCCESS);
367         }
368 }
369
370
371 /*
372  * update a @REPLCHANGED record in each partition if there have been
373  * any writes of replicated data in the partition
374  */
375 static int replmd_notify_store(struct ldb_module *module)
376 {
377         struct replmd_private *replmd_private = 
378                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
379         struct ldb_context *ldb = ldb_module_get_ctx(module);
380
381         while (replmd_private->ncs) {
382                 int ret;
383                 struct nc_entry *modified_partition = replmd_private->ncs;
384
385                 ret = dsdb_save_partition_usn(ldb, modified_partition->dn, modified_partition->mod_usn);
386                 if (ret != LDB_SUCCESS) {
387                         DEBUG(0,(__location__ ": Failed to save partition uSN for %s\n",
388                                  ldb_dn_get_linearized(modified_partition->dn)));
389                         return ret;
390                 }
391                 DLIST_REMOVE(replmd_private->ncs, modified_partition);
392                 talloc_free(modified_partition);
393         }
394
395         return LDB_SUCCESS;
396 }
397
398
399 /*
400   created a replmd_replicated_request context
401  */
402 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
403                                                          struct ldb_request *req)
404 {
405         struct ldb_context *ldb;
406         struct replmd_replicated_request *ac;
407
408         ldb = ldb_module_get_ctx(module);
409
410         ac = talloc_zero(req, struct replmd_replicated_request);
411         if (ac == NULL) {
412                 ldb_oom(ldb);
413                 return NULL;
414         }
415
416         ac->module = module;
417         ac->req = req;
418
419         ac->schema = dsdb_get_schema(ldb);
420         if (!ac->schema) {
421                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
422                               "replmd_modify: no dsdb_schema loaded");
423                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
424                 return NULL;
425         }
426
427         return ac;
428 }
429
430 /*
431   add a time element to a record
432 */
433 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
434 {
435         struct ldb_message_element *el;
436         char *s;
437
438         if (ldb_msg_find_element(msg, attr) != NULL) {
439                 return LDB_SUCCESS;
440         }
441
442         s = ldb_timestring(msg, t);
443         if (s == NULL) {
444                 return LDB_ERR_OPERATIONS_ERROR;
445         }
446
447         if (ldb_msg_add_string(msg, attr, s) != LDB_SUCCESS) {
448                 return LDB_ERR_OPERATIONS_ERROR;
449         }
450
451         el = ldb_msg_find_element(msg, attr);
452         /* always set as replace. This works because on add ops, the flag
453            is ignored */
454         el->flags = LDB_FLAG_MOD_REPLACE;
455
456         return LDB_SUCCESS;
457 }
458
459 /*
460   add a uint64_t element to a record
461 */
462 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
463 {
464         struct ldb_message_element *el;
465
466         if (ldb_msg_find_element(msg, attr) != NULL) {
467                 return LDB_SUCCESS;
468         }
469
470         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != LDB_SUCCESS) {
471                 return LDB_ERR_OPERATIONS_ERROR;
472         }
473
474         el = ldb_msg_find_element(msg, attr);
475         /* always set as replace. This works because on add ops, the flag
476            is ignored */
477         el->flags = LDB_FLAG_MOD_REPLACE;
478
479         return LDB_SUCCESS;
480 }
481
482 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
483                                                    const struct replPropertyMetaData1 *m2,
484                                                    const uint32_t *rdn_attid)
485 {
486         if (m1->attid == m2->attid) {
487                 return 0;
488         }
489
490         /*
491          * the rdn attribute should be at the end!
492          * so we need to return a value greater than zero
493          * which means m1 is greater than m2
494          */
495         if (m1->attid == *rdn_attid) {
496                 return 1;
497         }
498
499         /*
500          * the rdn attribute should be at the end!
501          * so we need to return a value less than zero
502          * which means m2 is greater than m1
503          */
504         if (m2->attid == *rdn_attid) {
505                 return -1;
506         }
507
508         return m1->attid > m2->attid ? 1 : -1;
509 }
510
511 static int replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
512                                                 const struct dsdb_schema *schema,
513                                                 struct ldb_dn *dn)
514 {
515         const char *rdn_name;
516         const struct dsdb_attribute *rdn_sa;
517
518         rdn_name = ldb_dn_get_rdn_name(dn);
519         if (!rdn_name) {
520                 DEBUG(0,(__location__ ": No rDN for %s?\n", ldb_dn_get_linearized(dn)));
521                 return LDB_ERR_OPERATIONS_ERROR;
522         }
523
524         rdn_sa = dsdb_attribute_by_lDAPDisplayName(schema, rdn_name);
525         if (rdn_sa == NULL) {
526                 DEBUG(0,(__location__ ": No sa found for rDN %s for %s\n", rdn_name, ldb_dn_get_linearized(dn)));
527                 return LDB_ERR_OPERATIONS_ERROR;                
528         }
529
530         DEBUG(6,("Sorting rpmd with attid exception %u rDN=%s DN=%s\n", 
531                  rdn_sa->attributeID_id, rdn_name, ldb_dn_get_linearized(dn)));
532
533         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
534                   discard_const_p(void, &rdn_sa->attributeID_id), 
535                   (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
536
537         return LDB_SUCCESS;
538 }
539
540 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
541                                                  const struct ldb_message_element *e2,
542                                                  const struct dsdb_schema *schema)
543 {
544         const struct dsdb_attribute *a1;
545         const struct dsdb_attribute *a2;
546
547         /* 
548          * TODO: make this faster by caching the dsdb_attribute pointer
549          *       on the ldb_messag_element
550          */
551
552         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
553         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
554
555         /*
556          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
557          *       in the schema
558          */
559         if (!a1 || !a2) {
560                 return strcasecmp(e1->name, e2->name);
561         }
562         if (a1->attributeID_id == a2->attributeID_id) {
563                 return 0;
564         }
565         return a1->attributeID_id > a2->attributeID_id ? 1 : -1;
566 }
567
568 static void replmd_ldb_message_sort(struct ldb_message *msg,
569                                     const struct dsdb_schema *schema)
570 {
571         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
572                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
573 }
574
575 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
576 {
577         struct ldb_context *ldb;
578         struct ldb_control *control;
579         struct replmd_replicated_request *ac;
580         enum ndr_err_code ndr_err;
581         struct ldb_request *down_req;
582         struct ldb_message *msg;
583         const DATA_BLOB *guid_blob;
584         struct GUID guid;
585         struct replPropertyMetaDataBlob nmd;
586         struct ldb_val nmd_value;
587         const struct GUID *our_invocation_id;
588         time_t t = time(NULL);
589         NTTIME now;
590         char *time_str;
591         int ret;
592         uint32_t i, ni=0;
593         bool allow_add_guid = false;
594         bool remove_current_guid = false;
595
596         /* check if there's a show relax control (used by provision to say 'I know what I'm doing') */
597         control = ldb_request_get_control(req, LDB_CONTROL_RELAX_OID);
598         if (control) {
599                 allow_add_guid = 1;
600         }
601
602         /* do not manipulate our control entries */
603         if (ldb_dn_is_special(req->op.add.message->dn)) {
604                 return ldb_next_request(module, req);
605         }
606
607         ldb = ldb_module_get_ctx(module);
608
609         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
610
611         ac = replmd_ctx_init(module, req);
612         if (!ac) {
613                 return LDB_ERR_OPERATIONS_ERROR;
614         }
615
616         guid_blob = ldb_msg_find_ldb_val(req->op.add.message, "objectGUID");
617         if ( guid_blob != NULL ) {
618                 if( !allow_add_guid ) {
619                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
620                               "replmd_add: it's not allowed to add an object with objectGUID\n");
621                         talloc_free(ac);
622                         return LDB_ERR_UNWILLING_TO_PERFORM;
623                 } else {
624                         NTSTATUS status = GUID_from_data_blob(guid_blob,&guid);
625                         if ( !NT_STATUS_IS_OK(status)) {
626                                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
627                                       "replmd_add: Unable to parse as a GUID the attribute objectGUID\n");
628                                 talloc_free(ac);
629                                 return LDB_ERR_UNWILLING_TO_PERFORM;
630                         }
631                         /* we remove this attribute as it can be a string and will not be treated 
632                         correctly and then we will readd it latter on in the good format*/
633                         remove_current_guid = true;
634                 }
635         } else {
636                 /* a new GUID */
637                 guid = GUID_random();
638         }
639
640         /* Get a sequence number from the backend */
641         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ac->seq_num);
642         if (ret != LDB_SUCCESS) {
643                 talloc_free(ac);
644                 return ret;
645         }
646
647         /* get our invocationId */
648         our_invocation_id = samdb_ntds_invocation_id(ldb);
649         if (!our_invocation_id) {
650                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
651                               "replmd_add: unable to find invocationId\n");
652                 talloc_free(ac);
653                 return LDB_ERR_OPERATIONS_ERROR;
654         }
655
656         /* we have to copy the message as the caller might have it as a const */
657         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
658         if (msg == NULL) {
659                 ldb_oom(ldb);
660                 talloc_free(ac);
661                 return LDB_ERR_OPERATIONS_ERROR;
662         }
663
664         /* generated times */
665         unix_to_nt_time(&now, t);
666         time_str = ldb_timestring(msg, t);
667         if (!time_str) {
668                 ldb_oom(ldb);
669                 talloc_free(ac);
670                 return LDB_ERR_OPERATIONS_ERROR;
671         }
672         if (remove_current_guid) {
673                 ldb_msg_remove_attr(msg,"objectGUID");
674         }
675
676         /* 
677          * remove autogenerated attributes
678          */
679         ldb_msg_remove_attr(msg, "whenCreated");
680         ldb_msg_remove_attr(msg, "whenChanged");
681         ldb_msg_remove_attr(msg, "uSNCreated");
682         ldb_msg_remove_attr(msg, "uSNChanged");
683         ldb_msg_remove_attr(msg, "replPropertyMetaData");
684
685         /*
686          * readd replicated attributes
687          */
688         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
689         if (ret != LDB_SUCCESS) {
690                 ldb_oom(ldb);
691                 talloc_free(ac);
692                 return ret;
693         }
694
695         /* build the replication meta_data */
696         ZERO_STRUCT(nmd);
697         nmd.version             = 1;
698         nmd.ctr.ctr1.count      = msg->num_elements;
699         nmd.ctr.ctr1.array      = talloc_array(msg,
700                                                struct replPropertyMetaData1,
701                                                nmd.ctr.ctr1.count);
702         if (!nmd.ctr.ctr1.array) {
703                 ldb_oom(ldb);
704                 talloc_free(ac);
705                 return LDB_ERR_OPERATIONS_ERROR;
706         }
707
708         for (i=0; i < msg->num_elements; i++) {
709                 struct ldb_message_element *e = &msg->elements[i];
710                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
711                 const struct dsdb_attribute *sa;
712
713                 if (e->name[0] == '@') continue;
714
715                 sa = dsdb_attribute_by_lDAPDisplayName(ac->schema, e->name);
716                 if (!sa) {
717                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
718                                       "replmd_add: attribute '%s' not defined in schema\n",
719                                       e->name);
720                         talloc_free(ac);
721                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
722                 }
723
724                 if ((sa->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (sa->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
725                         /* if the attribute is not replicated (0x00000001)
726                          * or constructed (0x00000004) it has no metadata
727                          */
728                         continue;
729                 }
730
731                 m->attid                        = sa->attributeID_id;
732                 m->version                      = 1;
733                 m->originating_change_time      = now;
734                 m->originating_invocation_id    = *our_invocation_id;
735                 m->originating_usn              = ac->seq_num;
736                 m->local_usn                    = ac->seq_num;
737                 ni++;
738         }
739
740         /* fix meta data count */
741         nmd.ctr.ctr1.count = ni;
742
743         /*
744          * sort meta data array, and move the rdn attribute entry to the end
745          */
746         ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, ac->schema, msg->dn);
747         if (ret != LDB_SUCCESS) {
748                 talloc_free(ac);
749                 return ret;
750         }
751
752         /* generated NDR encoded values */
753         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
754                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
755                                        &nmd,
756                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
757         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
758                 ldb_oom(ldb);
759                 talloc_free(ac);
760                 return LDB_ERR_OPERATIONS_ERROR;
761         }
762
763         /*
764          * add the autogenerated values
765          */
766         ret = dsdb_msg_add_guid(msg, &guid, "objectGUID");
767         if (ret != LDB_SUCCESS) {
768                 ldb_oom(ldb);
769                 talloc_free(ac);
770                 return ret;
771         }
772         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
773         if (ret != LDB_SUCCESS) {
774                 ldb_oom(ldb);
775                 talloc_free(ac);
776                 return ret;
777         }
778         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", ac->seq_num);
779         if (ret != LDB_SUCCESS) {
780                 ldb_oom(ldb);
781                 talloc_free(ac);
782                 return ret;
783         }
784         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ac->seq_num);
785         if (ret != LDB_SUCCESS) {
786                 ldb_oom(ldb);
787                 talloc_free(ac);
788                 return ret;
789         }
790         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
791         if (ret != LDB_SUCCESS) {
792                 ldb_oom(ldb);
793                 talloc_free(ac);
794                 return ret;
795         }
796
797         /*
798          * sort the attributes by attid before storing the object
799          */
800         replmd_ldb_message_sort(msg, ac->schema);
801
802         ret = ldb_build_add_req(&down_req, ldb, ac,
803                                 msg,
804                                 req->controls,
805                                 ac, replmd_op_callback,
806                                 req);
807         if (ret != LDB_SUCCESS) {
808                 talloc_free(ac);
809                 return ret;
810         }
811
812         /* mark the control done */
813         if (control) {
814                 control->critical = 0;
815         }
816
817         /* go on with the call chain */
818         return ldb_next_request(module, down_req);
819 }
820
821
822 /*
823  * update the replPropertyMetaData for one element
824  */
825 static int replmd_update_rpmd_element(struct ldb_context *ldb, 
826                                       struct ldb_message *msg,
827                                       struct ldb_message_element *el,
828                                       struct replPropertyMetaDataBlob *omd,
829                                       const struct dsdb_schema *schema,
830                                       uint64_t *seq_num,
831                                       const struct GUID *our_invocation_id,
832                                       NTTIME now)
833 {
834         int i;
835         const struct dsdb_attribute *a;
836         struct replPropertyMetaData1 *md1;
837
838         a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
839         if (a == NULL) {
840                 DEBUG(0,(__location__ ": Unable to find attribute %s in schema\n",
841                          el->name));
842                 return LDB_ERR_OPERATIONS_ERROR;
843         }
844
845         if ((a->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (a->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
846                 return LDB_SUCCESS;
847         }
848
849         for (i=0; i<omd->ctr.ctr1.count; i++) {
850                 if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) break;
851         }
852
853 #if W2K3_LINKED_ATTRIBUTES
854         if (a->linkID != 0 && dsdb_functional_level(ldb) > DS_DOMAIN_FUNCTION_2000) {
855                 /* linked attributes are not stored in
856                    replPropertyMetaData in FL above w2k, but we do
857                    raise the seqnum for the object  */
858                 if (*seq_num == 0 &&
859                     ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num) != LDB_SUCCESS) {
860                         return LDB_ERR_OPERATIONS_ERROR;
861                 }
862                 return LDB_SUCCESS;
863         }
864 #endif
865
866         if (i == omd->ctr.ctr1.count) {
867                 /* we need to add a new one */
868                 omd->ctr.ctr1.array = talloc_realloc(msg, omd->ctr.ctr1.array, 
869                                                      struct replPropertyMetaData1, omd->ctr.ctr1.count+1);
870                 if (omd->ctr.ctr1.array == NULL) {
871                         ldb_oom(ldb);
872                         return LDB_ERR_OPERATIONS_ERROR;
873                 }
874                 omd->ctr.ctr1.count++;
875                 ZERO_STRUCT(omd->ctr.ctr1.array[i]);
876         }
877
878         /* Get a new sequence number from the backend. We only do this
879          * if we have a change that requires a new
880          * replPropertyMetaData element 
881          */
882         if (*seq_num == 0) {
883                 int ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num);
884                 if (ret != LDB_SUCCESS) {
885                         return LDB_ERR_OPERATIONS_ERROR;
886                 }
887         }
888
889         md1 = &omd->ctr.ctr1.array[i];
890         md1->version++;
891         md1->attid                     = a->attributeID_id;
892         md1->originating_change_time   = now;
893         md1->originating_invocation_id = *our_invocation_id;
894         md1->originating_usn           = *seq_num;
895         md1->local_usn                 = *seq_num;
896         
897         return LDB_SUCCESS;
898 }
899
900 /*
901  * update the replPropertyMetaData object each time we modify an
902  * object. This is needed for DRS replication, as the merge on the
903  * client is based on this object 
904  */
905 static int replmd_update_rpmd(struct ldb_module *module, 
906                               const struct dsdb_schema *schema, 
907                               struct ldb_message *msg, uint64_t *seq_num,
908                               time_t t)
909 {
910         const struct ldb_val *omd_value;
911         enum ndr_err_code ndr_err;
912         struct replPropertyMetaDataBlob omd;
913         int i;
914         NTTIME now;
915         const struct GUID *our_invocation_id;
916         int ret;
917         const char *attrs[] = { "replPropertyMetaData" , NULL };
918         struct ldb_result *res;
919         struct ldb_context *ldb;
920
921         ldb = ldb_module_get_ctx(module);
922
923         our_invocation_id = samdb_ntds_invocation_id(ldb);
924         if (!our_invocation_id) {
925                 /* this happens during an initial vampire while
926                    updating the schema */
927                 DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
928                 return LDB_SUCCESS;
929         }
930
931         unix_to_nt_time(&now, t);
932
933         /* search for the existing replPropertyMetaDataBlob */
934         ret = dsdb_search_dn_with_deleted(ldb, msg, &res, msg->dn, attrs);
935         if (ret != LDB_SUCCESS || res->count != 1) {
936                 DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
937                          ldb_dn_get_linearized(msg->dn)));
938                 return LDB_ERR_OPERATIONS_ERROR;
939         }
940                 
941
942         omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
943         if (!omd_value) {
944                 DEBUG(0,(__location__ ": Object %s does not have a replPropertyMetaData attribute\n",
945                          ldb_dn_get_linearized(msg->dn)));
946                 return LDB_ERR_OPERATIONS_ERROR;
947         }
948
949         ndr_err = ndr_pull_struct_blob(omd_value, msg,
950                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
951                                        (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
952         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
953                 DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
954                          ldb_dn_get_linearized(msg->dn)));
955                 return LDB_ERR_OPERATIONS_ERROR;
956         }
957
958         if (omd.version != 1) {
959                 DEBUG(0,(__location__ ": bad version %u in replPropertyMetaData for %s\n",
960                          omd.version, ldb_dn_get_linearized(msg->dn)));
961                 return LDB_ERR_OPERATIONS_ERROR;
962         }
963
964         for (i=0; i<msg->num_elements; i++) {
965                 ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], &omd, schema, seq_num, 
966                                                  our_invocation_id, now);
967                 if (ret != LDB_SUCCESS) {
968                         return ret;
969                 }
970         }
971
972         /*
973          * replmd_update_rpmd_element has done an update if the
974          * seq_num is set
975          */
976         if (*seq_num != 0) {
977                 struct ldb_val *md_value;
978                 struct ldb_message_element *el;
979
980                 md_value = talloc(msg, struct ldb_val);
981                 if (md_value == NULL) {
982                         ldb_oom(ldb);
983                         return LDB_ERR_OPERATIONS_ERROR;
984                 }
985
986                 ret = replmd_replPropertyMetaDataCtr1_sort(&omd.ctr.ctr1, schema, msg->dn);
987                 if (ret != LDB_SUCCESS) {
988                         return ret;
989                 }
990
991                 ndr_err = ndr_push_struct_blob(md_value, msg, 
992                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
993                                                &omd,
994                                                (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
995                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
996                         DEBUG(0,(__location__ ": Failed to marshall replPropertyMetaData for %s\n",
997                                  ldb_dn_get_linearized(msg->dn)));
998                         return LDB_ERR_OPERATIONS_ERROR;
999                 }
1000
1001                 ret = ldb_msg_add_empty(msg, "replPropertyMetaData", LDB_FLAG_MOD_REPLACE, &el);
1002                 if (ret != LDB_SUCCESS) {
1003                         DEBUG(0,(__location__ ": Failed to add updated replPropertyMetaData %s\n",
1004                                  ldb_dn_get_linearized(msg->dn)));
1005                         return ret;
1006                 }
1007
1008                 el->num_values = 1;
1009                 el->values = md_value;
1010         }
1011
1012         return LDB_SUCCESS;     
1013 }
1014
1015
1016 struct parsed_dn {
1017         struct dsdb_dn *dsdb_dn;
1018         struct GUID *guid;
1019         struct ldb_val *v;
1020 };
1021
1022 static int parsed_dn_compare(struct parsed_dn *pdn1, struct parsed_dn *pdn2)
1023 {
1024         return GUID_compare(pdn1->guid, pdn2->guid);
1025 }
1026
1027 static struct parsed_dn *parsed_dn_find(struct parsed_dn *pdn, int count, struct GUID *guid)
1028 {
1029         struct parsed_dn *ret;
1030         BINARY_ARRAY_SEARCH(pdn, count, guid, guid, GUID_compare, ret);
1031         return ret;
1032 }
1033
1034 /*
1035   get a series of message element values as an array of DNs and GUIDs
1036   the result is sorted by GUID
1037  */
1038 static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
1039                           struct ldb_message_element *el, struct parsed_dn **pdn,
1040                           const char *ldap_oid)
1041 {
1042         int i;
1043         struct ldb_context *ldb = ldb_module_get_ctx(module);
1044
1045         if (el == NULL) {
1046                 *pdn = NULL;
1047                 return LDB_SUCCESS;
1048         }
1049
1050         (*pdn) = talloc_array(mem_ctx, struct parsed_dn, el->num_values);
1051         if (!*pdn) {
1052                 ldb_module_oom(module);
1053                 return LDB_ERR_OPERATIONS_ERROR;
1054         }
1055
1056         for (i=0; i<el->num_values; i++) {
1057                 struct ldb_val *v = &el->values[i];
1058                 NTSTATUS status;
1059                 struct ldb_dn *dn;
1060                 struct parsed_dn *p;
1061
1062                 p = &(*pdn)[i];
1063
1064                 p->dsdb_dn = dsdb_dn_parse(*pdn, ldb, v, ldap_oid);
1065                 if (p->dsdb_dn == NULL) {
1066                         return LDB_ERR_INVALID_DN_SYNTAX;
1067                 }
1068
1069                 dn = p->dsdb_dn->dn;
1070
1071                 p->guid = talloc(*pdn, struct GUID);
1072                 if (p->guid == NULL) {
1073                         ldb_module_oom(module);
1074                         return LDB_ERR_OPERATIONS_ERROR;
1075                 }
1076
1077                 status = dsdb_get_extended_dn_guid(dn, p->guid);
1078                 if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
1079                         /* we got a DN without a GUID - go find the GUID */
1080                         int ret = dsdb_find_guid_by_dn(ldb, dn, p->guid);
1081                         if (ret != LDB_SUCCESS) {
1082                                 ldb_asprintf_errstring(ldb, "Unable to find GUID for DN %s\n",
1083                                                        ldb_dn_get_linearized(dn));
1084                                 return ret;
1085                         }
1086                 } else if (!NT_STATUS_IS_OK(status)) {
1087                         return LDB_ERR_OPERATIONS_ERROR;
1088                 }
1089
1090                 /* keep a pointer to the original ldb_val */
1091                 p->v = v;
1092         }
1093
1094         qsort(*pdn, el->num_values, sizeof((*pdn)[0]), (comparison_fn_t)parsed_dn_compare);
1095
1096         return LDB_SUCCESS;
1097 }
1098
1099 /*
1100   build a new extended DN, including all meta data fields
1101
1102   DELETED        = 1 or missing
1103   RMD_ADDTIME    = originating_add_time
1104   RMD_INVOCID    = originating_invocation_id
1105   RMD_CHANGETIME = originating_change_time
1106   RMD_USN        = originating_usn
1107   RMD_VERSION    = version
1108  */
1109 static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct parsed_dn *p,
1110                                const struct GUID *invocation_id, uint64_t seq_num, time_t t)
1111 {
1112         struct ldb_dn *dn = p->dsdb_dn->dn;
1113         NTTIME now;
1114         const char *tstring, *usn_string;
1115         struct ldb_val tval;
1116         struct ldb_val iid;
1117         struct ldb_val usnv;
1118         struct ldb_val vers;
1119         NTSTATUS status;
1120         int ret;
1121         const char *dnstring;
1122
1123         unix_to_nt_time(&now, t);
1124         tstring = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)now);
1125         if (!tstring) {
1126                 return LDB_ERR_OPERATIONS_ERROR;
1127         }
1128         tval = data_blob_string_const(tstring);
1129
1130         usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)seq_num);
1131         if (!usn_string) {
1132                 return LDB_ERR_OPERATIONS_ERROR;
1133         }
1134         usnv = data_blob_string_const(usn_string);
1135
1136         vers = data_blob_string_const("0");
1137
1138         status = GUID_to_ndr_blob(invocation_id, dn, &iid);
1139         if (!NT_STATUS_IS_OK(status)) {
1140                 return LDB_ERR_OPERATIONS_ERROR;
1141         }
1142
1143         ret = ldb_dn_set_extended_component(dn, "DELETED", NULL);
1144         if (ret != LDB_SUCCESS) return ret;
1145         ret = ldb_dn_set_extended_component(dn, "RMD_ADDTIME", &tval);
1146         if (ret != LDB_SUCCESS) return ret;
1147         ret = ldb_dn_set_extended_component(dn, "RMD_INVOCID", &iid);
1148         if (ret != LDB_SUCCESS) return ret;
1149         ret = ldb_dn_set_extended_component(dn, "RMD_CHANGETIME", &tval);
1150         if (ret != LDB_SUCCESS) return ret;
1151         ret = ldb_dn_set_extended_component(dn, "RMD_USN", &usnv);
1152         if (ret != LDB_SUCCESS) return ret;
1153         ret = ldb_dn_set_extended_component(dn, "RMD_VERSION", &vers);
1154         if (ret != LDB_SUCCESS) return ret;
1155
1156         dnstring = dsdb_dn_get_extended_linearized(mem_ctx, p->dsdb_dn, 1);
1157         if (dnstring == NULL) {
1158                 return LDB_ERR_OPERATIONS_ERROR;
1159         }
1160         *v = data_blob_string_const(dnstring);
1161
1162         return LDB_SUCCESS;
1163 }
1164
1165
1166 /*
1167   update an extended DN, including all meta data fields
1168
1169   see replmd_build_la_val for value names
1170  */
1171 static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct parsed_dn *p,
1172                                 struct parsed_dn *old_p, const struct GUID *invocation_id,
1173                                 uint64_t seq_num, time_t t, bool deleted)
1174 {
1175         struct ldb_dn *dn = p->dsdb_dn->dn;
1176         NTTIME now;
1177         const char *tstring, *usn_string;
1178         struct ldb_val tval;
1179         struct ldb_val iid;
1180         struct ldb_val usnv;
1181         struct ldb_val vers;
1182         const struct ldb_val *old_addtime, *old_version;
1183         NTSTATUS status;
1184         int ret;
1185         const char *dnstring;
1186
1187         unix_to_nt_time(&now, t);
1188         tstring = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)now);
1189         if (!tstring) {
1190                 return LDB_ERR_OPERATIONS_ERROR;
1191         }
1192         tval = data_blob_string_const(tstring);
1193
1194         usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)seq_num);
1195         if (!usn_string) {
1196                 return LDB_ERR_OPERATIONS_ERROR;
1197         }
1198         usnv = data_blob_string_const(usn_string);
1199
1200         status = GUID_to_ndr_blob(invocation_id, dn, &iid);
1201         if (!NT_STATUS_IS_OK(status)) {
1202                 return LDB_ERR_OPERATIONS_ERROR;
1203         }
1204
1205         if (deleted) {
1206                 struct ldb_val dv;
1207                 dv = data_blob_string_const("1");
1208                 ret = ldb_dn_set_extended_component(dn, "DELETED", &dv);
1209         } else {
1210                 ret = ldb_dn_set_extended_component(dn, "DELETED", NULL);
1211         }
1212         if (ret != LDB_SUCCESS) return ret;
1213
1214         /* get the ADDTIME from the original */
1215         old_addtime = ldb_dn_get_extended_component(old_p->dsdb_dn->dn, "RMD_ADDTIME");
1216         if (old_addtime == NULL) {
1217                 old_addtime = &tval;
1218         }
1219         if (p != old_p) {
1220                 ret = ldb_dn_set_extended_component(dn, "RMD_ADDTIME", old_addtime);
1221                 if (ret != LDB_SUCCESS) return ret;
1222         }
1223
1224         /* use our invocation id */
1225         ret = ldb_dn_set_extended_component(dn, "RMD_INVOCID", &iid);
1226         if (ret != LDB_SUCCESS) return ret;
1227
1228         /* changetime is the current time */
1229         ret = ldb_dn_set_extended_component(dn, "RMD_CHANGETIME", &tval);
1230         if (ret != LDB_SUCCESS) return ret;
1231
1232         /* update the USN */
1233         ret = ldb_dn_set_extended_component(dn, "RMD_USN", &usnv);
1234         if (ret != LDB_SUCCESS) return ret;
1235
1236         /* increase the version by 1 */
1237         old_version = ldb_dn_get_extended_component(old_p->dsdb_dn->dn, "RMD_VERSION");
1238         if (old_version == NULL) {
1239                 vers = data_blob_string_const("0");
1240         } else {
1241                 char *vstring;
1242                 vstring = talloc_strndup(dn, (const char *)old_version->data, old_version->length);
1243                 if (!vstring) {
1244                         return LDB_ERR_OPERATIONS_ERROR;
1245                 }
1246                 vstring = talloc_asprintf(dn, "%lu",
1247                                           (unsigned long)strtoul(vstring, NULL, 0)+1);
1248                 vers = data_blob_string_const(vstring);
1249         }
1250         ret = ldb_dn_set_extended_component(dn, "RMD_VERSION", &vers);
1251         if (ret != LDB_SUCCESS) return ret;
1252
1253         dnstring = dsdb_dn_get_extended_linearized(mem_ctx, p->dsdb_dn, 1);
1254         if (dnstring == NULL) {
1255                 return LDB_ERR_OPERATIONS_ERROR;
1256         }
1257         *v = data_blob_string_const(dnstring);
1258
1259         return LDB_SUCCESS;
1260 }
1261
1262 /*
1263   handle adding a linked attribute
1264  */
1265 static int replmd_modify_la_add(struct ldb_module *module,
1266                                 struct dsdb_schema *schema,
1267                                 struct ldb_message *msg,
1268                                 struct ldb_message_element *el,
1269                                 struct ldb_message_element *old_el,
1270                                 const struct dsdb_attribute *schema_attr,
1271                                 uint64_t seq_num,
1272                                 time_t t,
1273                                 struct GUID *msg_guid)
1274 {
1275         int i;
1276         struct parsed_dn *dns, *old_dns;
1277         TALLOC_CTX *tmp_ctx = talloc_new(msg);
1278         int ret;
1279         struct ldb_val *new_values = NULL;
1280         unsigned int num_new_values = 0;
1281         unsigned old_num_values = old_el?old_el->num_values:0;
1282         const struct GUID *invocation_id;
1283         struct ldb_context *ldb = ldb_module_get_ctx(module);
1284
1285         ret = get_parsed_dns(module, tmp_ctx, el, &dns, schema_attr->syntax->ldap_oid);
1286         if (ret != LDB_SUCCESS) {
1287                 talloc_free(tmp_ctx);
1288                 return ret;
1289         }
1290
1291         ret = get_parsed_dns(module, tmp_ctx, old_el, &old_dns, schema_attr->syntax->ldap_oid);
1292         if (ret != LDB_SUCCESS) {
1293                 talloc_free(tmp_ctx);
1294                 return ret;
1295         }
1296
1297         invocation_id = samdb_ntds_invocation_id(ldb);
1298         if (!invocation_id) {
1299                 return LDB_ERR_OPERATIONS_ERROR;
1300         }
1301
1302         /* for each new value, see if it exists already with the same GUID */
1303         for (i=0; i<el->num_values; i++) {
1304                 struct parsed_dn *p = parsed_dn_find(old_dns, old_num_values, dns[i].guid);
1305                 if (p == NULL) {
1306                         /* this is a new linked attribute value */
1307                         new_values = talloc_realloc(tmp_ctx, new_values, struct ldb_val, num_new_values+1);
1308                         if (new_values == NULL) {
1309                                 ldb_module_oom(module);
1310                                 talloc_free(tmp_ctx);
1311                                 return LDB_ERR_OPERATIONS_ERROR;
1312                         }
1313                         ret = replmd_build_la_val(new_values, &new_values[num_new_values], &dns[i],
1314                                                   invocation_id, seq_num, t);
1315                         if (ret != LDB_SUCCESS) {
1316                                 talloc_free(tmp_ctx);
1317                                 return ret;
1318                         }
1319                         num_new_values++;
1320                 } else {
1321                         /* this is only allowed if the GUID was
1322                            previously deleted. */
1323                         const struct ldb_val *v;
1324                         v = ldb_dn_get_extended_component(p->dsdb_dn->dn, "DELETED");
1325                         if (v == NULL) {
1326                                 ldb_asprintf_errstring(ldb, "Attribute %s already exists for target GUID %s",
1327                                                        el->name, GUID_string(tmp_ctx, p->guid));
1328                                 talloc_free(tmp_ctx);
1329                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1330                         }
1331                         ret = replmd_update_la_val(old_el->values, p->v, &dns[i], p, invocation_id, seq_num, t, false);
1332                         if (ret != LDB_SUCCESS) {
1333                                 talloc_free(tmp_ctx);
1334                                 return ret;
1335                         }
1336                 }
1337
1338                 ret = replmd_add_backlink(module, schema, msg_guid, dns[i].guid, true, schema_attr);
1339                 if (ret != LDB_SUCCESS) {
1340                         talloc_free(tmp_ctx);
1341                         return ret;
1342                 }
1343         }
1344
1345         /* add the new ones on to the end of the old values, constructing a new el->values */
1346         el->values = talloc_realloc(msg->elements, old_el?old_el->values:NULL,
1347                                     struct ldb_val,
1348                                     old_num_values+num_new_values);
1349         if (el->values == NULL) {
1350                 ldb_module_oom(module);
1351                 return LDB_ERR_OPERATIONS_ERROR;
1352         }
1353
1354         memcpy(&el->values[old_num_values], new_values, num_new_values*sizeof(struct ldb_val));
1355         el->num_values = old_num_values + num_new_values;
1356
1357         talloc_steal(msg->elements, el->values);
1358         talloc_steal(el->values, new_values);
1359
1360         talloc_free(tmp_ctx);
1361
1362         /* we now tell the backend to replace all existing values
1363            with the one we have constructed */
1364         el->flags = LDB_FLAG_MOD_REPLACE;
1365
1366         return LDB_SUCCESS;
1367 }
1368
1369
1370 /*
1371   handle deleting all active linked attributes
1372  */
1373 static int replmd_modify_la_delete(struct ldb_module *module,
1374                                    struct dsdb_schema *schema,
1375                                    struct ldb_message *msg,
1376                                    struct ldb_message_element *el,
1377                                    struct ldb_message_element *old_el,
1378                                    const struct dsdb_attribute *schema_attr,
1379                                    uint64_t seq_num,
1380                                    time_t t,
1381                                    struct GUID *msg_guid)
1382 {
1383         int i;
1384         struct parsed_dn *dns, *old_dns;
1385         TALLOC_CTX *tmp_ctx = talloc_new(msg);
1386         int ret;
1387         const struct GUID *invocation_id;
1388         struct ldb_context *ldb = ldb_module_get_ctx(module);
1389
1390         /* check if there is nothing to delete */
1391         if ((!old_el || old_el->num_values == 0) &&
1392             el->num_values == 0) {
1393                 return LDB_SUCCESS;
1394         }
1395
1396         if (!old_el || old_el->num_values == 0) {
1397                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
1398         }
1399
1400         ret = get_parsed_dns(module, tmp_ctx, el, &dns, schema_attr->syntax->ldap_oid);
1401         if (ret != LDB_SUCCESS) {
1402                 talloc_free(tmp_ctx);
1403                 return ret;
1404         }
1405
1406         ret = get_parsed_dns(module, tmp_ctx, old_el, &old_dns, schema_attr->syntax->ldap_oid);
1407         if (ret != LDB_SUCCESS) {
1408                 talloc_free(tmp_ctx);
1409                 return ret;
1410         }
1411
1412         invocation_id = samdb_ntds_invocation_id(ldb);
1413         if (!invocation_id) {
1414                 return LDB_ERR_OPERATIONS_ERROR;
1415         }
1416
1417         el->values = NULL;
1418
1419         /* see if we are being asked to delete any links that
1420            don't exist or are already deleted */
1421         for (i=0; i<el->num_values; i++) {
1422                 struct parsed_dn *p = &dns[i];
1423                 struct parsed_dn *p2;
1424                 const struct ldb_val *v;
1425
1426                 p2 = parsed_dn_find(old_dns, old_el->num_values, p->guid);
1427                 if (!p2) {
1428                         ldb_asprintf_errstring(ldb, "Attribute %s doesn't exist for target GUID %s",
1429                                                el->name, GUID_string(tmp_ctx, p->guid));
1430                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
1431                 }
1432                 v = ldb_dn_get_extended_component(p2->dsdb_dn->dn, "DELETED");
1433                 if (v) {
1434                         ldb_asprintf_errstring(ldb, "Attribute %s already deleted for target GUID %s",
1435                                                el->name, GUID_string(tmp_ctx, p->guid));
1436                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
1437                 }
1438         }
1439
1440         /* for each new value, see if it exists already with the same GUID
1441            if it is not already deleted and matches the delete list then delete it
1442         */
1443         for (i=0; i<old_el->num_values; i++) {
1444                 struct parsed_dn *p = &old_dns[i];
1445                 const struct ldb_val *v;
1446
1447                 if (dns && parsed_dn_find(dns, el->num_values, p->guid) == NULL) {
1448                         continue;
1449                 }
1450
1451                 v = ldb_dn_get_extended_component(p->dsdb_dn->dn, "DELETED");
1452                 if (v != NULL) continue;
1453
1454                 ret = replmd_update_la_val(old_el->values, p->v, p, p, invocation_id, seq_num, t, true);
1455                 if (ret != LDB_SUCCESS) {
1456                         talloc_free(tmp_ctx);
1457                         return ret;
1458                 }
1459
1460                 ret = replmd_add_backlink(module, schema, msg_guid, old_dns[i].guid, false, schema_attr);
1461                 if (ret != LDB_SUCCESS) {
1462                         talloc_free(tmp_ctx);
1463                         return ret;
1464                 }
1465         }
1466
1467         el->values = talloc_steal(msg->elements, old_el->values);
1468         el->num_values = old_el->num_values;
1469
1470         talloc_free(tmp_ctx);
1471
1472         /* we now tell the backend to replace all existing values
1473            with the one we have constructed */
1474         el->flags = LDB_FLAG_MOD_REPLACE;
1475
1476         return LDB_SUCCESS;
1477 }
1478
1479 /*
1480   handle replacing a linked attribute
1481  */
1482 static int replmd_modify_la_replace(struct ldb_module *module,
1483                                     struct dsdb_schema *schema,
1484                                     struct ldb_message *msg,
1485                                     struct ldb_message_element *el,
1486                                     struct ldb_message_element *old_el,
1487                                     const struct dsdb_attribute *schema_attr,
1488                                     uint64_t seq_num,
1489                                     time_t t,
1490                                     struct GUID *msg_guid)
1491 {
1492         int i;
1493         struct parsed_dn *dns, *old_dns;
1494         TALLOC_CTX *tmp_ctx = talloc_new(msg);
1495         int ret;
1496         const struct GUID *invocation_id;
1497         struct ldb_context *ldb = ldb_module_get_ctx(module);
1498         struct ldb_val *new_values = NULL;
1499         uint32_t num_new_values = 0;
1500         unsigned old_num_values = old_el?old_el->num_values:0;
1501
1502         /* check if there is nothing to replace */
1503         if ((!old_el || old_el->num_values == 0) &&
1504             el->num_values == 0) {
1505                 return LDB_SUCCESS;
1506         }
1507
1508         ret = get_parsed_dns(module, tmp_ctx, el, &dns, schema_attr->syntax->ldap_oid);
1509         if (ret != LDB_SUCCESS) {
1510                 talloc_free(tmp_ctx);
1511                 return ret;
1512         }
1513
1514         ret = get_parsed_dns(module, tmp_ctx, old_el, &old_dns, schema_attr->syntax->ldap_oid);
1515         if (ret != LDB_SUCCESS) {
1516                 talloc_free(tmp_ctx);
1517                 return ret;
1518         }
1519
1520         invocation_id = samdb_ntds_invocation_id(ldb);
1521         if (!invocation_id) {
1522                 return LDB_ERR_OPERATIONS_ERROR;
1523         }
1524
1525         /* mark all the old ones as deleted */
1526         for (i=0; i<old_num_values; i++) {
1527                 struct parsed_dn *old_p = &old_dns[i];
1528                 struct parsed_dn *p;
1529                 const struct ldb_val *v;
1530
1531                 v = ldb_dn_get_extended_component(old_p->dsdb_dn->dn, "DELETED");
1532                 if (v) continue;
1533
1534                 ret = replmd_add_backlink(module, schema, msg_guid, old_dns[i].guid, false, schema_attr);
1535                 if (ret != LDB_SUCCESS) {
1536                         talloc_free(tmp_ctx);
1537                         return ret;
1538                 }
1539
1540                 p = parsed_dn_find(dns, el->num_values, old_p->guid);
1541                 if (p) {
1542                         /* we don't delete it if we are re-adding it */
1543                         continue;
1544                 }
1545
1546                 ret = replmd_update_la_val(old_el->values, old_p->v, old_p, old_p,
1547                                            invocation_id, seq_num, t, true);
1548                 if (ret != LDB_SUCCESS) {
1549                         talloc_free(tmp_ctx);
1550                         return ret;
1551                 }
1552         }
1553
1554         /* for each new value, either update its meta-data, or add it
1555          * to old_el
1556         */
1557         for (i=0; i<el->num_values; i++) {
1558                 struct parsed_dn *p = &dns[i], *old_p;
1559
1560                 if (old_dns &&
1561                     (old_p = parsed_dn_find(old_dns,
1562                                             old_num_values, p->guid)) != NULL) {
1563                         /* update in place */
1564                         ret = replmd_update_la_val(old_el->values, old_p->v, old_p,
1565                                                    old_p, invocation_id,
1566                                                    seq_num, t, false);
1567                         if (ret != LDB_SUCCESS) {
1568                                 talloc_free(tmp_ctx);
1569                                 return ret;
1570                         }
1571                 } else {
1572                         /* add a new one */
1573                         new_values = talloc_realloc(tmp_ctx, new_values, struct ldb_val,
1574                                                     num_new_values+1);
1575                         if (new_values == NULL) {
1576                                 ldb_module_oom(module);
1577                                 talloc_free(tmp_ctx);
1578                                 return LDB_ERR_OPERATIONS_ERROR;
1579                         }
1580                         ret = replmd_build_la_val(new_values, &new_values[num_new_values], &dns[i],
1581                                                   invocation_id, seq_num, t);
1582                         if (ret != LDB_SUCCESS) {
1583                                 talloc_free(tmp_ctx);
1584                                 return ret;
1585                         }
1586                         num_new_values++;
1587                 }
1588
1589                 ret = replmd_add_backlink(module, schema, msg_guid, dns[i].guid, true, schema_attr);
1590                 if (ret != LDB_SUCCESS) {
1591                         talloc_free(tmp_ctx);
1592                         return ret;
1593                 }
1594         }
1595
1596         /* add the new values to the end of old_el */
1597         if (num_new_values != 0) {
1598                 el->values = talloc_realloc(msg->elements, old_el?old_el->values:NULL,
1599                                             struct ldb_val, old_num_values+num_new_values);
1600                 if (el->values == NULL) {
1601                         ldb_module_oom(module);
1602                         return LDB_ERR_OPERATIONS_ERROR;
1603                 }
1604                 memcpy(&el->values[old_num_values], &new_values[0],
1605                        sizeof(struct ldb_val)*num_new_values);
1606                 el->num_values = old_num_values + num_new_values;
1607                 talloc_steal(msg->elements, new_values);
1608         } else {
1609                 el->values = old_el->values;
1610                 el->num_values = old_el->num_values;
1611                 talloc_steal(msg->elements, el->values);
1612         }
1613
1614         talloc_free(tmp_ctx);
1615
1616         /* we now tell the backend to replace all existing values
1617            with the one we have constructed */
1618         el->flags = LDB_FLAG_MOD_REPLACE;
1619
1620         return LDB_SUCCESS;
1621 }
1622
1623
1624 /*
1625   handle linked attributes in modify requests
1626  */
1627 static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
1628                                                struct ldb_message *msg,
1629                                                uint64_t seq_num, time_t t)
1630 {
1631         struct ldb_result *res;
1632         int ret, i;
1633         struct ldb_context *ldb = ldb_module_get_ctx(module);
1634         struct ldb_message *old_msg;
1635         struct dsdb_schema *schema = dsdb_get_schema(ldb);
1636         struct GUID old_guid;
1637
1638         if (seq_num == 0) {
1639                 /* there the replmd_update_rpmd code has already
1640                  * checked and saw that there are no linked
1641                  * attributes */
1642                 return LDB_SUCCESS;
1643         }
1644
1645 #if !W2K3_LINKED_ATTRIBUTES
1646         return LDB_SUCCESS;
1647 #endif
1648
1649         if (dsdb_functional_level(ldb) == DS_DOMAIN_FUNCTION_2000) {
1650                 /* don't do anything special for linked attributes */
1651                 return LDB_SUCCESS;
1652         }
1653
1654         ret = dsdb_module_search_dn(module, msg, &res, msg->dn, NULL,
1655                                     DSDB_SEARCH_SHOW_DELETED |
1656                                     DSDB_SEARCH_REVEAL_INTERNALS |
1657                                     DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT);
1658         if (ret != LDB_SUCCESS) {
1659                 return ret;
1660         }
1661         old_msg = res->msgs[0];
1662
1663         old_guid = samdb_result_guid(old_msg, "objectGUID");
1664
1665         for (i=0; i<msg->num_elements; i++) {
1666                 struct ldb_message_element *el = &msg->elements[i];
1667                 struct ldb_message_element *old_el, *new_el;
1668                 const struct dsdb_attribute *schema_attr
1669                         = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
1670                 if (!schema_attr) {
1671                         ldb_asprintf_errstring(ldb,
1672                                                "attribute %s is not a valid attribute in schema", el->name);
1673                         return LDB_ERR_OBJECT_CLASS_VIOLATION;
1674                 }
1675                 if (schema_attr->linkID == 0) {
1676                         continue;
1677                 }
1678                 if ((schema_attr->linkID & 1) == 1) {
1679                         /* Odd is for the target.  Illegal to modify */
1680                         ldb_asprintf_errstring(ldb,
1681                                                "attribute %s must not be modified directly, it is a linked attribute", el->name);
1682                         return LDB_ERR_UNWILLING_TO_PERFORM;
1683                 }
1684                 old_el = ldb_msg_find_element(old_msg, el->name);
1685                 switch (el->flags & LDB_FLAG_MOD_MASK) {
1686                 case LDB_FLAG_MOD_REPLACE:
1687                         ret = replmd_modify_la_replace(module, schema, msg, el, old_el, schema_attr, seq_num, t, &old_guid);
1688                         break;
1689                 case LDB_FLAG_MOD_DELETE:
1690                         ret = replmd_modify_la_delete(module, schema, msg, el, old_el, schema_attr, seq_num, t, &old_guid);
1691                         break;
1692                 case LDB_FLAG_MOD_ADD:
1693                         ret = replmd_modify_la_add(module, schema, msg, el, old_el, schema_attr, seq_num, t, &old_guid);
1694                         break;
1695                 default:
1696                         ldb_asprintf_errstring(ldb,
1697                                                "invalid flags 0x%x for %s linked attribute",
1698                                                el->flags, el->name);
1699                         return LDB_ERR_UNWILLING_TO_PERFORM;
1700                 }
1701                 if (ret != LDB_SUCCESS) {
1702                         return ret;
1703                 }
1704                 if (old_el) {
1705                         ldb_msg_remove_attr(old_msg, el->name);
1706                 }
1707                 ldb_msg_add_empty(old_msg, el->name, 0, &new_el);
1708                 new_el->num_values = el->num_values;
1709                 new_el->values = el->values;
1710
1711                 /* TODO: this relises a bit too heavily on the exact
1712                    behaviour of ldb_msg_find_element and
1713                    ldb_msg_remove_element */
1714                 old_el = ldb_msg_find_element(msg, el->name);
1715                 if (old_el != el) {
1716                         ldb_msg_remove_element(msg, old_el);
1717                         i--;
1718                 }
1719         }
1720
1721         talloc_free(res);
1722         return ret;
1723 }
1724
1725
1726
1727 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
1728 {
1729         struct ldb_context *ldb;
1730         struct replmd_replicated_request *ac;
1731         struct ldb_request *down_req;
1732         struct ldb_message *msg;
1733         time_t t = time(NULL);
1734         int ret;
1735
1736         /* do not manipulate our control entries */
1737         if (ldb_dn_is_special(req->op.mod.message->dn)) {
1738                 return ldb_next_request(module, req);
1739         }
1740
1741         ldb = ldb_module_get_ctx(module);
1742
1743         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
1744
1745         ac = replmd_ctx_init(module, req);
1746         if (!ac) {
1747                 return LDB_ERR_OPERATIONS_ERROR;
1748         }
1749
1750         /* we have to copy the message as the caller might have it as a const */
1751         msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
1752         if (msg == NULL) {
1753                 ldb_oom(ldb);
1754                 talloc_free(ac);
1755                 return LDB_ERR_OPERATIONS_ERROR;
1756         }
1757
1758         ret = replmd_update_rpmd(module, ac->schema, msg, &ac->seq_num, t);
1759         if (ret != LDB_SUCCESS) {
1760                 talloc_free(ac);
1761                 return ret;
1762         }
1763
1764         ret = replmd_modify_handle_linked_attribs(module, msg, ac->seq_num, t);
1765         if (ret != LDB_SUCCESS) {
1766                 talloc_free(ac);
1767                 return ret;
1768         }
1769
1770         /* TODO:
1771          * - replace the old object with the newly constructed one
1772          */
1773
1774         ret = ldb_build_mod_req(&down_req, ldb, ac,
1775                                 msg,
1776                                 req->controls,
1777                                 ac, replmd_op_callback,
1778                                 req);
1779         if (ret != LDB_SUCCESS) {
1780                 talloc_free(ac);
1781                 return ret;
1782         }
1783         talloc_steal(down_req, msg);
1784
1785         /* we only change whenChanged and uSNChanged if the seq_num
1786            has changed */
1787         if (ac->seq_num != 0) {
1788                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
1789                         talloc_free(ac);
1790                         return ret;
1791                 }
1792
1793                 if (add_uint64_element(msg, "uSNChanged", ac->seq_num) != LDB_SUCCESS) {
1794                         talloc_free(ac);
1795                         return ret;
1796                 }
1797         }
1798
1799         /* go on with the call chain */
1800         return ldb_next_request(module, down_req);
1801 }
1802
1803 static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *ares);
1804
1805 /*
1806   handle a rename request
1807
1808   On a rename we need to do an extra ldb_modify which sets the
1809   whenChanged and uSNChanged attributes.  We do this in a callback after the success.
1810  */
1811 static int replmd_rename(struct ldb_module *module, struct ldb_request *req)
1812 {
1813         struct ldb_context *ldb;
1814         struct replmd_replicated_request *ac;
1815         int ret;
1816         struct ldb_request *down_req;
1817
1818         /* do not manipulate our control entries */
1819         if (ldb_dn_is_special(req->op.mod.message->dn)) {
1820                 return ldb_next_request(module, req);
1821         }
1822
1823         ldb = ldb_module_get_ctx(module);
1824
1825         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_rename\n");
1826
1827         ac = replmd_ctx_init(module, req);
1828         if (!ac) {
1829                 return LDB_ERR_OPERATIONS_ERROR;
1830         }
1831         ret = ldb_build_rename_req(&down_req, ldb, ac,
1832                                    ac->req->op.rename.olddn,
1833                                    ac->req->op.rename.newdn,
1834                                    ac->req->controls,
1835                                    ac, replmd_rename_callback,
1836                                    ac->req);
1837
1838         if (ret != LDB_SUCCESS) {
1839                 talloc_free(ac);
1840                 return ret;
1841         }
1842
1843         /* go on with the call chain */
1844         return ldb_next_request(module, down_req);
1845 }
1846
1847 /* After the rename is compleated, update the whenchanged etc */
1848 static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *ares)
1849 {
1850         struct ldb_context *ldb;
1851         struct replmd_replicated_request *ac;
1852         struct ldb_request *down_req;
1853         struct ldb_message *msg;
1854         time_t t = time(NULL);
1855         int ret;
1856
1857         ac = talloc_get_type(req->context, struct replmd_replicated_request);
1858         ldb = ldb_module_get_ctx(ac->module);
1859
1860         if (ares->error != LDB_SUCCESS) {
1861                 return ldb_module_done(ac->req, ares->controls,
1862                                         ares->response, ares->error);
1863         }
1864
1865         if (ares->type != LDB_REPLY_DONE) {
1866                 ldb_set_errstring(ldb,
1867                                   "invalid ldb_reply_type in callback");
1868                 talloc_free(ares);
1869                 return ldb_module_done(ac->req, NULL, NULL,
1870                                         LDB_ERR_OPERATIONS_ERROR);
1871         }
1872
1873         /* Get a sequence number from the backend */
1874         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ac->seq_num);
1875         if (ret != LDB_SUCCESS) {
1876                 return ret;
1877         }
1878
1879         /* TODO:
1880          * - replace the old object with the newly constructed one
1881          */
1882
1883         msg = ldb_msg_new(ac);
1884         if (msg == NULL) {
1885                 ldb_oom(ldb);
1886                 return LDB_ERR_OPERATIONS_ERROR;
1887         }
1888
1889         msg->dn = ac->req->op.rename.newdn;
1890
1891         ret = ldb_build_mod_req(&down_req, ldb, ac,
1892                                 msg,
1893                                 req->controls,
1894                                 ac, replmd_op_callback,
1895                                 req);
1896
1897         if (ret != LDB_SUCCESS) {
1898                 talloc_free(ac);
1899                 return ret;
1900         }
1901         talloc_steal(down_req, msg);
1902
1903         if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
1904                 talloc_free(ac);
1905                 return ret;
1906         }
1907         
1908         if (add_uint64_element(msg, "uSNChanged", ac->seq_num) != LDB_SUCCESS) {
1909                 talloc_free(ac);
1910                 return ret;
1911         }
1912
1913         /* go on with the call chain - do the modify after the rename */
1914         return ldb_next_request(ac->module, down_req);
1915 }
1916
1917
1918 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
1919 {
1920         return ret;
1921 }
1922
1923 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
1924 {
1925         int ret = LDB_ERR_OTHER;
1926         /* TODO: do some error mapping */
1927         return ret;
1928 }
1929
1930 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
1931 {
1932         struct ldb_context *ldb;
1933         struct ldb_request *change_req;
1934         enum ndr_err_code ndr_err;
1935         struct ldb_message *msg;
1936         struct replPropertyMetaDataBlob *md;
1937         struct ldb_val md_value;
1938         uint32_t i;
1939         int ret;
1940
1941         /*
1942          * TODO: check if the parent object exist
1943          */
1944
1945         /*
1946          * TODO: handle the conflict case where an object with the
1947          *       same name exist
1948          */
1949
1950         ldb = ldb_module_get_ctx(ar->module);
1951         msg = ar->objs->objects[ar->index_current].msg;
1952         md = ar->objs->objects[ar->index_current].meta_data;
1953
1954         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ar->seq_num);
1955         if (ret != LDB_SUCCESS) {
1956                 return replmd_replicated_request_error(ar, ret);
1957         }
1958
1959         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
1960         if (ret != LDB_SUCCESS) {
1961                 return replmd_replicated_request_error(ar, ret);
1962         }
1963
1964         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1965         if (ret != LDB_SUCCESS) {
1966                 return replmd_replicated_request_error(ar, ret);
1967         }
1968
1969         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", ar->seq_num);
1970         if (ret != LDB_SUCCESS) {
1971                 return replmd_replicated_request_error(ar, ret);
1972         }
1973
1974         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ar->seq_num);
1975         if (ret != LDB_SUCCESS) {
1976                 return replmd_replicated_request_error(ar, ret);
1977         }
1978
1979         /* remove any message elements that have zero values */
1980         for (i=0; i<msg->num_elements; i++) {
1981                 if (msg->elements[i].num_values == 0) {
1982                         DEBUG(4,(__location__ ": Removing attribute %s with num_values==0\n",
1983                                  msg->elements[i].name));
1984                         memmove(&msg->elements[i], 
1985                                 &msg->elements[i+1], 
1986                                 sizeof(msg->elements[i])*(msg->num_elements - (i+1)));
1987                         msg->num_elements--;
1988                         i--;
1989                 }
1990         }
1991         
1992         /*
1993          * the meta data array is already sorted by the caller
1994          */
1995         for (i=0; i < md->ctr.ctr1.count; i++) {
1996                 md->ctr.ctr1.array[i].local_usn = ar->seq_num;
1997         }
1998         ndr_err = ndr_push_struct_blob(&md_value, msg, 
1999                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
2000                                        md,
2001                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
2002         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
2003                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
2004                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
2005         }
2006         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
2007         if (ret != LDB_SUCCESS) {
2008                 return replmd_replicated_request_error(ar, ret);
2009         }
2010
2011         replmd_ldb_message_sort(msg, ar->schema);
2012
2013         if (DEBUGLVL(4)) {
2014                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
2015                 DEBUG(4, ("DRS replication add message:\n%s\n", s));
2016                 talloc_free(s);
2017         }
2018
2019         ret = ldb_build_add_req(&change_req,
2020                                 ldb,
2021                                 ar,
2022                                 msg,
2023                                 ar->controls,
2024                                 ar,
2025                                 replmd_op_callback,
2026                                 ar->req);
2027         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
2028
2029         return ldb_next_request(ar->module, change_req);
2030 }
2031
2032 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
2033                                                          struct replPropertyMetaData1 *m2)
2034 {
2035         int ret;
2036
2037         if (m1->version != m2->version) {
2038                 return m1->version - m2->version;
2039         }
2040
2041         if (m1->originating_change_time != m2->originating_change_time) {
2042                 return m1->originating_change_time - m2->originating_change_time;
2043         }
2044
2045         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
2046         if (ret != 0) {
2047                 return ret;
2048         }
2049
2050         return m1->originating_usn - m2->originating_usn;
2051 }
2052
2053 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
2054 {
2055         struct ldb_context *ldb;
2056         struct ldb_request *change_req;
2057         enum ndr_err_code ndr_err;
2058         struct ldb_message *msg;
2059         struct replPropertyMetaDataBlob *rmd;
2060         struct replPropertyMetaDataBlob omd;
2061         const struct ldb_val *omd_value;
2062         struct replPropertyMetaDataBlob nmd;
2063         struct ldb_val nmd_value;
2064         uint32_t i,j,ni=0;
2065         uint32_t removed_attrs = 0;
2066         int ret;
2067
2068         ldb = ldb_module_get_ctx(ar->module);
2069         msg = ar->objs->objects[ar->index_current].msg;
2070         rmd = ar->objs->objects[ar->index_current].meta_data;
2071         ZERO_STRUCT(omd);
2072         omd.version = 1;
2073
2074         /*
2075          * TODO: check repl data is correct after a rename
2076          */
2077         if (ldb_dn_compare(msg->dn, ar->search_msg->dn) != 0) {
2078                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_request rename %s => %s\n",
2079                           ldb_dn_get_linearized(ar->search_msg->dn),
2080                           ldb_dn_get_linearized(msg->dn));
2081                 if (ldb_rename(ldb, ar->search_msg->dn, msg->dn) != LDB_SUCCESS) {
2082                         ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_replicated_request rename %s => %s failed - %s\n",
2083                                   ldb_dn_get_linearized(ar->search_msg->dn),
2084                                   ldb_dn_get_linearized(msg->dn),
2085                                   ldb_errstring(ldb));
2086                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
2087                 }
2088         }
2089
2090         /* find existing meta data */
2091         omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
2092         if (omd_value) {
2093                 ndr_err = ndr_pull_struct_blob(omd_value, ar,
2094                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
2095                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
2096                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
2097                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
2098                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
2099                 }
2100
2101                 if (omd.version != 1) {
2102                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
2103                 }
2104         }
2105
2106         ZERO_STRUCT(nmd);
2107         nmd.version = 1;
2108         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
2109         nmd.ctr.ctr1.array = talloc_array(ar,
2110                                           struct replPropertyMetaData1,
2111                                           nmd.ctr.ctr1.count);
2112         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
2113
2114         /* first copy the old meta data */
2115         for (i=0; i < omd.ctr.ctr1.count; i++) {
2116                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
2117                 ni++;
2118         }
2119
2120         /* now merge in the new meta data */
2121         for (i=0; i < rmd->ctr.ctr1.count; i++) {
2122                 bool found = false;
2123
2124                 for (j=0; j < ni; j++) {
2125                         int cmp;
2126
2127                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
2128                                 continue;
2129                         }
2130
2131                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
2132                                                                             &nmd.ctr.ctr1.array[j]);
2133                         if (cmp > 0) {
2134                                 /* replace the entry */
2135                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
2136                                 found = true;
2137                                 break;
2138                         }
2139
2140                         /* we don't want to apply this change so remove the attribute */
2141                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
2142                         removed_attrs++;
2143
2144                         found = true;
2145                         break;
2146                 }
2147
2148                 if (found) continue;
2149
2150                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
2151                 ni++;
2152         }
2153
2154         /*
2155          * finally correct the size of the meta_data array
2156          */
2157         nmd.ctr.ctr1.count = ni;
2158
2159         /*
2160          * the rdn attribute (the alias for the name attribute),
2161          * 'cn' for most objects is the last entry in the meta data array
2162          * we have stored
2163          *
2164          * sort the new meta data array
2165          */
2166         ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, ar->schema, msg->dn);
2167         if (ret != LDB_SUCCESS) {
2168                 return ret;
2169         }
2170
2171         /*
2172          * check if some replicated attributes left, otherwise skip the ldb_modify() call
2173          */
2174         if (msg->num_elements == 0) {
2175                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
2176                           ar->index_current);
2177
2178                 ar->index_current++;
2179                 return replmd_replicated_apply_next(ar);
2180         }
2181
2182         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
2183                   ar->index_current, msg->num_elements);
2184
2185         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ar->seq_num);
2186         if (ret != LDB_SUCCESS) {
2187                 return replmd_replicated_request_error(ar, ret);
2188         }
2189
2190         for (i=0; i<ni; i++) {
2191                 nmd.ctr.ctr1.array[i].local_usn = ar->seq_num;
2192         }
2193
2194         /* create the meta data value */
2195         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
2196                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
2197                                        &nmd,
2198                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
2199         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
2200                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
2201                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
2202         }
2203
2204         /*
2205          * when we know that we'll modify the record, add the whenChanged, uSNChanged
2206          * and replPopertyMetaData attributes
2207          */
2208         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
2209         if (ret != LDB_SUCCESS) {
2210                 return replmd_replicated_request_error(ar, ret);
2211         }
2212         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ar->seq_num);
2213         if (ret != LDB_SUCCESS) {
2214                 return replmd_replicated_request_error(ar, ret);
2215         }
2216         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
2217         if (ret != LDB_SUCCESS) {
2218                 return replmd_replicated_request_error(ar, ret);
2219         }
2220
2221         replmd_ldb_message_sort(msg, ar->schema);
2222
2223         /* we want to replace the old values */
2224         for (i=0; i < msg->num_elements; i++) {
2225                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
2226         }
2227
2228         if (DEBUGLVL(4)) {
2229                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
2230                 DEBUG(4, ("DRS replication modify message:\n%s\n", s));
2231                 talloc_free(s);
2232         }
2233
2234         ret = ldb_build_mod_req(&change_req,
2235                                 ldb,
2236                                 ar,
2237                                 msg,
2238                                 ar->controls,
2239                                 ar,
2240                                 replmd_op_callback,
2241                                 ar->req);
2242         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
2243
2244         return ldb_next_request(ar->module, change_req);
2245 }
2246
2247 static int replmd_replicated_apply_search_callback(struct ldb_request *req,
2248                                                    struct ldb_reply *ares)
2249 {
2250         struct replmd_replicated_request *ar = talloc_get_type(req->context,
2251                                                struct replmd_replicated_request);
2252         int ret;
2253
2254         if (!ares) {
2255                 return ldb_module_done(ar->req, NULL, NULL,
2256                                         LDB_ERR_OPERATIONS_ERROR);
2257         }
2258         if (ares->error != LDB_SUCCESS &&
2259             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
2260                 return ldb_module_done(ar->req, ares->controls,
2261                                         ares->response, ares->error);
2262         }
2263
2264         switch (ares->type) {
2265         case LDB_REPLY_ENTRY:
2266                 ar->search_msg = talloc_steal(ar, ares->message);
2267                 break;
2268
2269         case LDB_REPLY_REFERRAL:
2270                 /* we ignore referrals */
2271                 break;
2272
2273         case LDB_REPLY_DONE:
2274                 if (ar->search_msg != NULL) {
2275                         ret = replmd_replicated_apply_merge(ar);
2276                 } else {
2277                         ret = replmd_replicated_apply_add(ar);
2278                 }
2279                 if (ret != LDB_SUCCESS) {
2280                         return ldb_module_done(ar->req, NULL, NULL, ret);
2281                 }
2282         }
2283
2284         talloc_free(ares);
2285         return LDB_SUCCESS;
2286 }
2287
2288 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
2289
2290 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
2291 {
2292         struct ldb_context *ldb;
2293         int ret;
2294         char *tmp_str;
2295         char *filter;
2296         struct ldb_request *search_req;
2297         struct ldb_search_options_control *options;
2298
2299         if (ar->index_current >= ar->objs->num_objects) {
2300                 /* done with it, go to next stage */
2301                 return replmd_replicated_uptodate_vector(ar);
2302         }
2303
2304         ldb = ldb_module_get_ctx(ar->module);
2305         ar->search_msg = NULL;
2306
2307         tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].guid_value);
2308         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
2309
2310         filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
2311         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
2312         talloc_free(tmp_str);
2313
2314         ret = ldb_build_search_req(&search_req,
2315                                    ldb,
2316                                    ar,
2317                                    NULL,
2318                                    LDB_SCOPE_SUBTREE,
2319                                    filter,
2320                                    NULL,
2321                                    NULL,
2322                                    ar,
2323                                    replmd_replicated_apply_search_callback,
2324                                    ar->req);
2325
2326         ret = ldb_request_add_control(search_req, LDB_CONTROL_SHOW_DELETED_OID, true, NULL);
2327         if (ret != LDB_SUCCESS) {
2328                 return ret;
2329         }
2330
2331         /* we need to cope with cross-partition links, so search for
2332            the GUID over all partitions */
2333         options = talloc(search_req, struct ldb_search_options_control);
2334         if (options == NULL) {
2335                 DEBUG(0, (__location__ ": out of memory\n"));
2336                 return LDB_ERR_OPERATIONS_ERROR;
2337         }
2338         options->search_options = LDB_SEARCH_OPTION_PHANTOM_ROOT;
2339
2340         ret = ldb_request_add_control(search_req,
2341                                       LDB_CONTROL_SEARCH_OPTIONS_OID,
2342                                       true, options);
2343         if (ret != LDB_SUCCESS) {
2344                 return ret;
2345         }
2346
2347         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
2348
2349         return ldb_next_request(ar->module, search_req);
2350 }
2351
2352 static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
2353                                                       struct ldb_reply *ares)
2354 {
2355         struct ldb_context *ldb;
2356         struct replmd_replicated_request *ar = talloc_get_type(req->context,
2357                                                struct replmd_replicated_request);
2358         ldb = ldb_module_get_ctx(ar->module);
2359
2360         if (!ares) {
2361                 return ldb_module_done(ar->req, NULL, NULL,
2362                                         LDB_ERR_OPERATIONS_ERROR);
2363         }
2364         if (ares->error != LDB_SUCCESS) {
2365                 return ldb_module_done(ar->req, ares->controls,
2366                                         ares->response, ares->error);
2367         }
2368
2369         if (ares->type != LDB_REPLY_DONE) {
2370                 ldb_set_errstring(ldb, "Invalid reply type\n!");
2371                 return ldb_module_done(ar->req, NULL, NULL,
2372                                         LDB_ERR_OPERATIONS_ERROR);
2373         }
2374
2375         talloc_free(ares);
2376
2377         return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
2378 }
2379
2380 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
2381 {
2382         struct ldb_context *ldb;
2383         struct ldb_request *change_req;
2384         enum ndr_err_code ndr_err;
2385         struct ldb_message *msg;
2386         struct replUpToDateVectorBlob ouv;
2387         const struct ldb_val *ouv_value;
2388         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
2389         struct replUpToDateVectorBlob nuv;
2390         struct ldb_val nuv_value;
2391         struct ldb_message_element *nuv_el = NULL;
2392         const struct GUID *our_invocation_id;
2393         struct ldb_message_element *orf_el = NULL;
2394         struct repsFromToBlob nrf;
2395         struct ldb_val *nrf_value = NULL;
2396         struct ldb_message_element *nrf_el = NULL;
2397         uint32_t i,j,ni=0;
2398         bool found = false;
2399         time_t t = time(NULL);
2400         NTTIME now;
2401         int ret;
2402
2403         ldb = ldb_module_get_ctx(ar->module);
2404         ruv = ar->objs->uptodateness_vector;
2405         ZERO_STRUCT(ouv);
2406         ouv.version = 2;
2407         ZERO_STRUCT(nuv);
2408         nuv.version = 2;
2409
2410         unix_to_nt_time(&now, t);
2411
2412         /*
2413          * first create the new replUpToDateVector
2414          */
2415         ouv_value = ldb_msg_find_ldb_val(ar->search_msg, "replUpToDateVector");
2416         if (ouv_value) {
2417                 ndr_err = ndr_pull_struct_blob(ouv_value, ar,
2418                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &ouv,
2419                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
2420                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
2421                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
2422                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
2423                 }
2424
2425                 if (ouv.version != 2) {
2426                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
2427                 }
2428         }
2429
2430         /*
2431          * the new uptodateness vector will at least
2432          * contain 1 entry, one for the source_dsa
2433          *
2434          * plus optional values from our old vector and the one from the source_dsa
2435          */
2436         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
2437         if (ruv) nuv.ctr.ctr2.count += ruv->count;
2438         nuv.ctr.ctr2.cursors = talloc_array(ar,
2439                                             struct drsuapi_DsReplicaCursor2,
2440                                             nuv.ctr.ctr2.count);
2441         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
2442
2443         /* first copy the old vector */
2444         for (i=0; i < ouv.ctr.ctr2.count; i++) {
2445                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
2446                 ni++;
2447         }
2448
2449         /* get our invocation_id if we have one already attached to the ldb */
2450         our_invocation_id = samdb_ntds_invocation_id(ldb);
2451
2452         /* merge in the source_dsa vector is available */
2453         for (i=0; (ruv && i < ruv->count); i++) {
2454                 found = false;
2455
2456                 if (our_invocation_id &&
2457                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
2458                                our_invocation_id)) {
2459                         continue;
2460                 }
2461
2462                 for (j=0; j < ni; j++) {
2463                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
2464                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
2465                                 continue;
2466                         }
2467
2468                         found = true;
2469
2470                         /*
2471                          * we update only the highest_usn and not the latest_sync_success time,
2472                          * because the last success stands for direct replication
2473                          */
2474                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
2475                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
2476                         }
2477                         break;                  
2478                 }
2479
2480                 if (found) continue;
2481
2482                 /* if it's not there yet, add it */
2483                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
2484                 ni++;
2485         }
2486
2487         /*
2488          * merge in the current highwatermark for the source_dsa
2489          */
2490         found = false;
2491         for (j=0; j < ni; j++) {
2492                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
2493                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
2494                         continue;
2495                 }
2496
2497                 found = true;
2498
2499                 /*
2500                  * here we update the highest_usn and last_sync_success time
2501                  * because we're directly replicating from the source_dsa
2502                  *
2503                  * and use the tmp_highest_usn because this is what we have just applied
2504                  * to our ldb
2505                  */
2506                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
2507                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
2508                 break;
2509         }
2510         if (!found) {
2511                 /*
2512                  * here we update the highest_usn and last_sync_success time
2513                  * because we're directly replicating from the source_dsa
2514                  *
2515                  * and use the tmp_highest_usn because this is what we have just applied
2516                  * to our ldb
2517                  */
2518                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
2519                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
2520                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
2521                 ni++;
2522         }
2523
2524         /*
2525          * finally correct the size of the cursors array
2526          */
2527         nuv.ctr.ctr2.count = ni;
2528
2529         /*
2530          * sort the cursors
2531          */
2532         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
2533               sizeof(struct drsuapi_DsReplicaCursor2),
2534               (comparison_fn_t)drsuapi_DsReplicaCursor2_compare);
2535
2536         /*
2537          * create the change ldb_message
2538          */
2539         msg = ldb_msg_new(ar);
2540         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
2541         msg->dn = ar->search_msg->dn;
2542
2543         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
2544                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
2545                                        &nuv,
2546                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
2547         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
2548                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
2549                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
2550         }
2551         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
2552         if (ret != LDB_SUCCESS) {
2553                 return replmd_replicated_request_error(ar, ret);
2554         }
2555         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
2556
2557         /*
2558          * now create the new repsFrom value from the given repsFromTo1 structure
2559          */
2560         ZERO_STRUCT(nrf);
2561         nrf.version                                     = 1;
2562         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
2563         /* and fix some values... */
2564         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
2565         nrf.ctr.ctr1.last_success                       = now;
2566         nrf.ctr.ctr1.last_attempt                       = now;
2567         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
2568         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
2569
2570         /*
2571          * first see if we already have a repsFrom value for the current source dsa
2572          * if so we'll later replace this value
2573          */
2574         orf_el = ldb_msg_find_element(ar->search_msg, "repsFrom");
2575         if (orf_el) {
2576                 for (i=0; i < orf_el->num_values; i++) {
2577                         struct repsFromToBlob *trf;
2578
2579                         trf = talloc(ar, struct repsFromToBlob);
2580                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
2581
2582                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), trf,
2583                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
2584                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
2585                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
2586                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
2587                         }
2588
2589                         if (trf->version != 1) {
2590                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
2591                         }
2592
2593                         /*
2594                          * we compare the source dsa objectGUID not the invocation_id
2595                          * because we want only one repsFrom value per source dsa
2596                          * and when the invocation_id of the source dsa has changed we don't need 
2597                          * the old repsFrom with the old invocation_id
2598                          */
2599                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
2600                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
2601                                 talloc_free(trf);
2602                                 continue;
2603                         }
2604
2605                         talloc_free(trf);
2606                         nrf_value = &orf_el->values[i];
2607                         break;
2608                 }
2609
2610                 /*
2611                  * copy over all old values to the new ldb_message
2612                  */
2613                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
2614                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
2615                 *nrf_el = *orf_el;
2616         }
2617
2618         /*
2619          * if we haven't found an old repsFrom value for the current source dsa
2620          * we'll add a new value
2621          */
2622         if (!nrf_value) {
2623                 struct ldb_val zero_value;
2624                 ZERO_STRUCT(zero_value);
2625                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
2626                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
2627
2628                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
2629         }
2630
2631         /* we now fill the value which is already attached to ldb_message */
2632         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
2633                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
2634                                        &nrf,
2635                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
2636         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
2637                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
2638                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
2639         }
2640
2641         /* 
2642          * the ldb_message_element for the attribute, has all the old values and the new one
2643          * so we'll replace the whole attribute with all values
2644          */
2645         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
2646
2647         if (DEBUGLVL(4)) {
2648                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
2649                 DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
2650                 talloc_free(s);
2651         }
2652
2653         /* prepare the ldb_modify() request */
2654         ret = ldb_build_mod_req(&change_req,
2655                                 ldb,
2656                                 ar,
2657                                 msg,
2658                                 ar->controls,
2659                                 ar,
2660                                 replmd_replicated_uptodate_modify_callback,
2661                                 ar->req);
2662         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
2663
2664         return ldb_next_request(ar->module, change_req);
2665 }
2666
2667 static int replmd_replicated_uptodate_search_callback(struct ldb_request *req,
2668                                                       struct ldb_reply *ares)
2669 {
2670         struct replmd_replicated_request *ar = talloc_get_type(req->context,
2671                                                struct replmd_replicated_request);
2672         int ret;
2673
2674         if (!ares) {
2675                 return ldb_module_done(ar->req, NULL, NULL,
2676                                         LDB_ERR_OPERATIONS_ERROR);
2677         }
2678         if (ares->error != LDB_SUCCESS &&
2679             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
2680                 return ldb_module_done(ar->req, ares->controls,
2681                                         ares->response, ares->error);
2682         }
2683
2684         switch (ares->type) {
2685         case LDB_REPLY_ENTRY:
2686                 ar->search_msg = talloc_steal(ar, ares->message);
2687                 break;
2688
2689         case LDB_REPLY_REFERRAL:
2690                 /* we ignore referrals */
2691                 break;
2692
2693         case LDB_REPLY_DONE:
2694                 if (ar->search_msg == NULL) {
2695                         ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
2696                 } else {
2697                         ret = replmd_replicated_uptodate_modify(ar);
2698                 }
2699                 if (ret != LDB_SUCCESS) {
2700                         return ldb_module_done(ar->req, NULL, NULL, ret);
2701                 }
2702         }
2703
2704         talloc_free(ares);
2705         return LDB_SUCCESS;
2706 }
2707
2708
2709 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
2710 {
2711         struct ldb_context *ldb;
2712         int ret;
2713         static const char *attrs[] = {
2714                 "replUpToDateVector",
2715                 "repsFrom",
2716                 NULL
2717         };
2718         struct ldb_request *search_req;
2719
2720         ldb = ldb_module_get_ctx(ar->module);
2721         ar->search_msg = NULL;
2722
2723         ret = ldb_build_search_req(&search_req,
2724                                    ldb,
2725                                    ar,
2726                                    ar->objs->partition_dn,
2727                                    LDB_SCOPE_BASE,
2728                                    "(objectClass=*)",
2729                                    attrs,
2730                                    NULL,
2731                                    ar,
2732                                    replmd_replicated_uptodate_search_callback,
2733                                    ar->req);
2734         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
2735
2736         return ldb_next_request(ar->module, search_req);
2737 }
2738
2739
2740
2741 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
2742 {
2743         struct ldb_context *ldb;
2744         struct dsdb_extended_replicated_objects *objs;
2745         struct replmd_replicated_request *ar;
2746         struct ldb_control **ctrls;
2747         int ret, i;
2748         struct replmd_private *replmd_private = 
2749                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2750
2751         ldb = ldb_module_get_ctx(module);
2752
2753         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
2754
2755         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
2756         if (!objs) {
2757                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
2758                 return LDB_ERR_PROTOCOL_ERROR;
2759         }
2760
2761         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
2762                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
2763                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
2764                 return LDB_ERR_PROTOCOL_ERROR;
2765         }
2766
2767         ar = replmd_ctx_init(module, req);
2768         if (!ar)
2769                 return LDB_ERR_OPERATIONS_ERROR;
2770
2771         /* Set the flags to have the replmd_op_callback run over the full set of objects */
2772         ar->apply_mode = true;
2773         ar->objs = objs;
2774         ar->schema = dsdb_get_schema(ldb);
2775         if (!ar->schema) {
2776                 ldb_debug_set(ldb, LDB_DEBUG_FATAL, "replmd_ctx_init: no loaded schema found\n");
2777                 talloc_free(ar);
2778                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
2779                 return LDB_ERR_CONSTRAINT_VIOLATION;
2780         }
2781
2782         ctrls = req->controls;
2783
2784         if (req->controls) {
2785                 req->controls = talloc_memdup(ar, req->controls,
2786                                               talloc_get_size(req->controls));
2787                 if (!req->controls) return replmd_replicated_request_werror(ar, WERR_NOMEM);
2788         }
2789
2790         ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, NULL);
2791         if (ret != LDB_SUCCESS) {
2792                 return ret;
2793         }
2794
2795         ar->controls = req->controls;
2796         req->controls = ctrls;
2797
2798         DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
2799
2800         /* save away the linked attributes for the end of the
2801            transaction */
2802         for (i=0; i<ar->objs->linked_attributes_count; i++) {
2803                 struct la_entry *la_entry;
2804
2805                 if (replmd_private->la_ctx == NULL) {
2806                         replmd_private->la_ctx = talloc_new(replmd_private);
2807                 }
2808                 la_entry = talloc(replmd_private->la_ctx, struct la_entry);
2809                 if (la_entry == NULL) {
2810                         ldb_oom(ldb);
2811                         return LDB_ERR_OPERATIONS_ERROR;
2812                 }
2813                 la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
2814                 if (la_entry->la == NULL) {
2815                         talloc_free(la_entry);
2816                         ldb_oom(ldb);
2817                         return LDB_ERR_OPERATIONS_ERROR;
2818                 }
2819                 *la_entry->la = ar->objs->linked_attributes[i];
2820
2821                 /* we need to steal the non-scalars so they stay
2822                    around until the end of the transaction */
2823                 talloc_steal(la_entry->la, la_entry->la->identifier);
2824                 talloc_steal(la_entry->la, la_entry->la->value.blob);
2825
2826                 DLIST_ADD(replmd_private->la_list, la_entry);
2827         }
2828
2829         return replmd_replicated_apply_next(ar);
2830 }
2831
2832 /*
2833   process one linked attribute structure
2834  */
2835 static int replmd_process_linked_attribute(struct ldb_module *module,
2836                                            struct la_entry *la_entry)
2837 {                                          
2838         struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
2839         struct ldb_context *ldb = ldb_module_get_ctx(module);
2840         struct dsdb_schema *schema = dsdb_get_schema(ldb);
2841         struct drsuapi_DsReplicaObjectIdentifier3 target;
2842         struct ldb_message *msg;
2843         TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
2844         struct ldb_request *mod_req;
2845         int ret;
2846         const struct dsdb_attribute *attr;
2847         struct ldb_dn *target_dn;
2848         struct dsdb_dn *dsdb_dn;
2849         uint64_t seq_num = 0;
2850         struct drsuapi_DsReplicaAttribute drs;
2851         struct drsuapi_DsAttributeValue val;
2852         struct ldb_message_element el;
2853         const struct ldb_val *guid;
2854         WERROR status;
2855         time_t t = time(NULL);
2856
2857         drs.value_ctr.num_values = 1;
2858         drs.value_ctr.values = &val;
2859         val.blob = la->value.blob;
2860
2861 /*
2862 linked_attributes[0]:                                                     
2863      &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute 
2864         identifier               : *                                      
2865             identifier: struct drsuapi_DsReplicaObjectIdentifier          
2866                 __ndr_size               : 0x0000003a (58)                
2867                 __ndr_size_sid           : 0x00000000 (0)                 
2868                 guid                     : 8e95b6a9-13dd-4158-89db-3220a5be5cc7
2869                 sid                      : S-0-0                               
2870                 __ndr_size_dn            : 0x00000000 (0)                      
2871                 dn                       : ''                                  
2872         attid                    : DRSUAPI_ATTRIBUTE_member (0x1F)             
2873         value: struct drsuapi_DsAttributeValue                                 
2874             __ndr_size               : 0x0000007e (126)                        
2875             blob                     : *                                       
2876                 blob                     : DATA_BLOB length=126                
2877         flags                    : 0x00000001 (1)                              
2878                1: DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE                      
2879         originating_add_time     : Wed Sep  2 22:20:01 2009 EST                
2880         meta_data: struct drsuapi_DsReplicaMetaData                            
2881             version                  : 0x00000015 (21)                         
2882             originating_change_time  : Wed Sep  2 23:39:07 2009 EST            
2883             originating_invocation_id: 794640f3-18cf-40ee-a211-a93992b67a64    
2884             originating_usn          : 0x000000000001e19c (123292)             
2885
2886 (for cases where the link is to a normal DN)
2887      &target: struct drsuapi_DsReplicaObjectIdentifier3                        
2888         __ndr_size               : 0x0000007e (126)                            
2889         __ndr_size_sid           : 0x0000001c (28)                             
2890         guid                     : 7639e594-db75-4086-b0d4-67890ae46031        
2891         sid                      : S-1-5-21-2848215498-2472035911-1947525656-19924
2892         __ndr_size_dn            : 0x00000022 (34)                                
2893         dn                       : 'CN=UOne,OU=TestOU,DC=vsofs8,DC=com'           
2894  */
2895         
2896         /* find the attribute being modified */
2897         attr = dsdb_attribute_by_attributeID_id(schema, la->attid);
2898         if (attr == NULL) {
2899                 DEBUG(0, (__location__ ": Unable to find attributeID 0x%x\n", la->attid));
2900                 talloc_free(tmp_ctx);
2901                 return LDB_ERR_OPERATIONS_ERROR;
2902         }
2903
2904         status = attr->syntax->drsuapi_to_ldb(ldb, schema, attr, &drs, tmp_ctx, &el);
2905
2906         /* construct a modify request for this attribute change */
2907         msg = ldb_msg_new(tmp_ctx);
2908         if (!msg) {
2909                 ldb_oom(ldb);
2910                 talloc_free(tmp_ctx);
2911                 return LDB_ERR_OPERATIONS_ERROR;
2912         }
2913
2914         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, 
2915                                    GUID_string(tmp_ctx, &la->identifier->guid), &msg->dn);
2916         if (ret != LDB_SUCCESS) {
2917                 talloc_free(tmp_ctx);
2918                 return ret;
2919         }
2920
2921         el.name = attr->lDAPDisplayName;
2922         if (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) {
2923                 ret = ldb_msg_add(msg, &el, LDB_FLAG_MOD_ADD);
2924         } else {
2925                 ret = ldb_msg_add(msg, &el, LDB_FLAG_MOD_DELETE);
2926         }
2927         if (ret != LDB_SUCCESS) {
2928                 talloc_free(tmp_ctx);
2929                 return ret;
2930         }
2931
2932         dsdb_dn = dsdb_dn_parse(tmp_ctx, ldb, &el.values[0], attr->syntax->ldap_oid);
2933         if (!dsdb_dn) {
2934                 DEBUG(0,(__location__ ": Failed to parse just-generated DN\n"));
2935                 talloc_free(tmp_ctx);
2936                 return LDB_ERR_INVALID_DN_SYNTAX;
2937         }
2938
2939         guid = ldb_dn_get_extended_component(dsdb_dn->dn, "GUID");
2940         if (!guid) {
2941                 DEBUG(0,(__location__ ": Failed to parse GUID from just-generated DN\n"));
2942                 talloc_free(tmp_ctx);
2943                 return LDB_ERR_INVALID_DN_SYNTAX;
2944         }
2945
2946         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, ldb_binary_encode(tmp_ctx, *guid), &target_dn);
2947         if (ret != LDB_SUCCESS) {
2948                 /* If this proves to be a problem in the future, then
2949                  * just remove the return - perhaps we can just use
2950                  * the details the replication peer supplied */
2951
2952                 DEBUG(0,(__location__ ": Failed to map GUID %s to DN\n", GUID_string(tmp_ctx, &target.guid)));
2953                 talloc_free(tmp_ctx);
2954                 return LDB_ERR_OPERATIONS_ERROR;
2955         } else {
2956
2957                 /* Now update with full DN we just found in the DB (including extended components) */
2958                 dsdb_dn->dn = target_dn;
2959                 /* Now make a linearized version, using the original binary components (if any) */
2960                 el.values[0] = data_blob_string_const(dsdb_dn_get_extended_linearized(tmp_ctx, dsdb_dn, 1));
2961         }
2962
2963         ret = replmd_update_rpmd(module, schema, msg, &seq_num, t);
2964         if (ret != LDB_SUCCESS) {
2965                 talloc_free(tmp_ctx);
2966                 return ret;
2967         }
2968
2969         /* we only change whenChanged and uSNChanged if the seq_num
2970            has changed */
2971         if (seq_num != 0) {
2972                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
2973                         talloc_free(tmp_ctx);
2974                         return LDB_ERR_OPERATIONS_ERROR;
2975                 }
2976
2977                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
2978                         talloc_free(tmp_ctx);
2979                         return LDB_ERR_OPERATIONS_ERROR;
2980                 }
2981         }
2982
2983         ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2984                                 msg,
2985                                 NULL,
2986                                 NULL, 
2987                                 ldb_op_default_callback,
2988                                 NULL);
2989         if (ret != LDB_SUCCESS) {
2990                 talloc_free(tmp_ctx);
2991                 return ret;
2992         }
2993         talloc_steal(mod_req, msg);
2994
2995         if (DEBUGLVL(4)) {
2996                 DEBUG(4,("Applying DRS linked attribute change:\n%s\n",
2997                          ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg)));
2998         }
2999
3000         /* Run the new request */
3001         ret = ldb_next_request(module, mod_req);
3002
3003         /* we need to wait for this to finish, as we are being called
3004            from the synchronous end_transaction hook of this module */
3005         if (ret == LDB_SUCCESS) {
3006                 ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
3007         }
3008
3009         if (ret == LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
3010                 /* the link destination exists, we need to update it
3011                  * by deleting the old one for the same DN then adding
3012                  * the new one */
3013                 msg->elements = talloc_realloc(msg, msg->elements,
3014                                                struct ldb_message_element,
3015                                                msg->num_elements+1);
3016                 if (msg->elements == NULL) {
3017                         ldb_oom(ldb);
3018                         talloc_free(tmp_ctx);
3019                         return LDB_ERR_OPERATIONS_ERROR;
3020                 }
3021                 /* this relies on the backend matching the old entry
3022                    only by the DN portion of the extended DN */
3023                 msg->elements[1] = msg->elements[0];
3024                 msg->elements[0].flags = LDB_FLAG_MOD_DELETE;
3025                 msg->num_elements++;
3026
3027                 ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
3028                                         msg,
3029                                         NULL,
3030                                         NULL, 
3031                                         ldb_op_default_callback,
3032                                         NULL);
3033                 if (ret != LDB_SUCCESS) {
3034                         talloc_free(tmp_ctx);
3035                         return ret;
3036                 }
3037
3038                 /* Run the new request */
3039                 ret = ldb_next_request(module, mod_req);
3040                 
3041                 if (ret == LDB_SUCCESS) {
3042                         ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
3043                 }
3044         }
3045
3046         if (ret != LDB_SUCCESS) {
3047                 ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s' %s\n",
3048                           ldb_errstring(ldb),
3049                           ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
3050                 ret = LDB_SUCCESS;
3051         }
3052         
3053         talloc_free(tmp_ctx);
3054
3055         return ret;     
3056 }
3057
3058 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
3059 {
3060         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
3061                 return replmd_extended_replicated_objects(module, req);
3062         }
3063
3064         return ldb_next_request(module, req);
3065 }
3066
3067
3068 /*
3069   we hook into the transaction operations to allow us to 
3070   perform the linked attribute updates at the end of the whole
3071   transaction. This allows a forward linked attribute to be created
3072   before the object is created. During a vampire, w2k8 sends us linked
3073   attributes before the objects they are part of.
3074  */
3075 static int replmd_start_transaction(struct ldb_module *module)
3076 {
3077         /* create our private structure for this transaction */
3078         struct replmd_private *replmd_private = talloc_get_type(ldb_module_get_private(module),
3079                                                                 struct replmd_private);
3080         replmd_txn_cleanup(replmd_private);
3081
3082         /* free any leftover mod_usn records from cancelled
3083            transactions */
3084         while (replmd_private->ncs) {
3085                 struct nc_entry *e = replmd_private->ncs;
3086                 DLIST_REMOVE(replmd_private->ncs, e);
3087                 talloc_free(e);
3088         }
3089
3090         return ldb_next_start_trans(module);
3091 }
3092
3093 /*
3094   on prepare commit we loop over our queued la_context structures and
3095   apply each of them  
3096  */
3097 static int replmd_prepare_commit(struct ldb_module *module)
3098 {
3099         struct replmd_private *replmd_private = 
3100                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
3101         struct la_entry *la, *prev;
3102         struct la_backlink *bl;
3103         int ret;
3104
3105         /* walk the list backwards, to do the first entry first, as we
3106          * added the entries with DLIST_ADD() which puts them at the
3107          * start of the list */
3108         for (la = replmd_private->la_list; la && la->next; la=la->next) ;
3109
3110         for (; la; la=prev) {
3111                 prev = la->prev;
3112                 DLIST_REMOVE(replmd_private->la_list, la);
3113                 ret = replmd_process_linked_attribute(module, la);
3114                 if (ret != LDB_SUCCESS) {
3115                         replmd_txn_cleanup(replmd_private);
3116                         return ret;
3117                 }
3118         }
3119
3120         /* process our backlink list, creating and deleting backlinks
3121            as necessary */
3122         for (bl=replmd_private->la_backlinks; bl; bl=bl->next) {
3123                 ret = replmd_process_backlink(module, bl);
3124                 if (ret != LDB_SUCCESS) {
3125                         replmd_txn_cleanup(replmd_private);
3126                         return ret;
3127                 }
3128         }
3129
3130         replmd_txn_cleanup(replmd_private);
3131
3132         /* possibly change @REPLCHANGED */
3133         ret = replmd_notify_store(module);
3134         if (ret != LDB_SUCCESS) {
3135                 return ret;
3136         }
3137         
3138         return ldb_next_prepare_commit(module);
3139 }
3140
3141 static int replmd_del_transaction(struct ldb_module *module)
3142 {
3143         struct replmd_private *replmd_private = 
3144                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
3145         replmd_txn_cleanup(replmd_private);
3146
3147         return ldb_next_del_trans(module);
3148 }
3149
3150
3151 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
3152         .name          = "repl_meta_data",
3153         .init_context      = replmd_init,
3154         .add               = replmd_add,
3155         .modify            = replmd_modify,
3156         .rename            = replmd_rename,
3157         .extended          = replmd_extended,
3158         .start_transaction = replmd_start_transaction,
3159         .prepare_commit    = replmd_prepare_commit,
3160         .del_transaction   = replmd_del_transaction,
3161 };