replmd: Allow missing targets if GET_TGT has already been set
[nivanova/samba-autobuild/.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /*
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005-2013
6    Copyright (C) Andrew Tridgell 2005-2009
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8    Copyright (C) Matthieu Patou <mat@samba.org> 2010-2011
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 3 of the License, or
13    (at your option) any later version.
14
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb repl_meta_data module
28  *
29  *  Description: - add a unique objectGUID onto every new record,
30  *               - handle whenCreated, whenChanged timestamps
31  *               - handle uSNCreated, uSNChanged numbers
32  *               - handle replPropertyMetaData attribute
33  *
34  *  Author: Simo Sorce
35  *  Author: Stefan Metzmacher
36  */
37
38 #include "includes.h"
39 #include "ldb_module.h"
40 #include "dsdb/samdb/samdb.h"
41 #include "dsdb/common/proto.h"
42 #include "dsdb/common/util.h"
43 #include "../libds/common/flags.h"
44 #include "librpc/gen_ndr/irpc.h"
45 #include "librpc/gen_ndr/ndr_misc.h"
46 #include "librpc/gen_ndr/ndr_drsuapi.h"
47 #include "librpc/gen_ndr/ndr_drsblobs.h"
48 #include "param/param.h"
49 #include "libcli/security/security.h"
50 #include "lib/util/dlinklist.h"
51 #include "dsdb/samdb/ldb_modules/util.h"
52 #include "lib/util/tsort.h"
53
54 #undef DBGC_CLASS
55 #define DBGC_CLASS            DBGC_DRS_REPL
56
57 /*
58  * It's 29/12/9999 at 23:59:59 UTC as specified in MS-ADTS 7.1.1.4.2
59  * Deleted Objects Container
60  */
61 static const NTTIME DELETED_OBJECT_CONTAINER_CHANGE_TIME = 2650466015990000000ULL;
62
63 struct replmd_private {
64         TALLOC_CTX *la_ctx;
65         struct la_entry *la_list;
66         struct nc_entry {
67                 struct nc_entry *prev, *next;
68                 struct ldb_dn *dn;
69                 uint64_t mod_usn;
70                 uint64_t mod_usn_urgent;
71         } *ncs;
72         struct ldb_dn *schema_dn;
73         bool originating_updates;
74         bool sorted_links;
75 };
76
77 struct la_entry {
78         struct la_entry *next, *prev;
79         struct drsuapi_DsReplicaLinkedAttribute *la;
80         uint32_t dsdb_repl_flags;
81 };
82
83 struct replmd_replicated_request {
84         struct ldb_module *module;
85         struct ldb_request *req;
86
87         const struct dsdb_schema *schema;
88         struct GUID our_invocation_id;
89
90         /* the controls we pass down */
91         struct ldb_control **controls;
92
93         /*
94          * Backlinks for the replmd_add() case (we want to create
95          * backlinks after creating the user, but before the end of
96          * the ADD request) 
97          */
98         struct la_backlink *la_backlinks;
99
100         /* details for the mode where we apply a bunch of inbound replication meessages */
101         bool apply_mode;
102         uint32_t index_current;
103         struct dsdb_extended_replicated_objects *objs;
104
105         struct ldb_message *search_msg;
106         struct GUID local_parent_guid;
107
108         uint64_t seq_num;
109         bool is_urgent;
110
111         bool isDeleted;
112 };
113
114 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar);
115 static int replmd_delete_internals(struct ldb_module *module, struct ldb_request *req, bool re_delete);
116 static int replmd_check_upgrade_links(struct ldb_context *ldb,
117                                       struct parsed_dn *dns, uint32_t count,
118                                       struct ldb_message_element *el,
119                                       const char *ldap_oid);
120 static int replmd_verify_linked_attribute(struct replmd_replicated_request *ar,
121                                           struct la_entry *la);
122
123 enum urgent_situation {
124         REPL_URGENT_ON_CREATE = 1,
125         REPL_URGENT_ON_UPDATE = 2,
126         REPL_URGENT_ON_DELETE = 4
127 };
128
129 enum deletion_state {
130         OBJECT_NOT_DELETED=1,
131         OBJECT_DELETED=2,
132         OBJECT_RECYCLED=3,
133         OBJECT_TOMBSTONE=4,
134         OBJECT_REMOVED=5
135 };
136
137 static void replmd_deletion_state(struct ldb_module *module,
138                                   const struct ldb_message *msg,
139                                   enum deletion_state *current_state,
140                                   enum deletion_state *next_state)
141 {
142         int ret;
143         bool enabled = false;
144
145         if (msg == NULL) {
146                 *current_state = OBJECT_REMOVED;
147                 if (next_state != NULL) {
148                         *next_state = OBJECT_REMOVED;
149                 }
150                 return;
151         }
152
153         ret = dsdb_recyclebin_enabled(module, &enabled);
154         if (ret != LDB_SUCCESS) {
155                 enabled = false;
156         }
157
158         if (ldb_msg_check_string_attribute(msg, "isDeleted", "TRUE")) {
159                 if (!enabled) {
160                         *current_state = OBJECT_TOMBSTONE;
161                         if (next_state != NULL) {
162                                 *next_state = OBJECT_REMOVED;
163                         }
164                         return;
165                 }
166
167                 if (ldb_msg_check_string_attribute(msg, "isRecycled", "TRUE")) {
168                         *current_state = OBJECT_RECYCLED;
169                         if (next_state != NULL) {
170                                 *next_state = OBJECT_REMOVED;
171                         }
172                         return;
173                 }
174
175                 *current_state = OBJECT_DELETED;
176                 if (next_state != NULL) {
177                         *next_state = OBJECT_RECYCLED;
178                 }
179                 return;
180         }
181
182         *current_state = OBJECT_NOT_DELETED;
183         if (next_state == NULL) {
184                 return;
185         }
186
187         if (enabled) {
188                 *next_state = OBJECT_DELETED;
189         } else {
190                 *next_state = OBJECT_TOMBSTONE;
191         }
192 }
193
194 static const struct {
195         const char *update_name;
196         enum urgent_situation repl_situation;
197 } urgent_objects[] = {
198                 {"nTDSDSA", (REPL_URGENT_ON_CREATE | REPL_URGENT_ON_DELETE)},
199                 {"crossRef", (REPL_URGENT_ON_CREATE | REPL_URGENT_ON_DELETE)},
200                 {"attributeSchema", (REPL_URGENT_ON_CREATE | REPL_URGENT_ON_UPDATE)},
201                 {"classSchema", (REPL_URGENT_ON_CREATE | REPL_URGENT_ON_UPDATE)},
202                 {"secret", (REPL_URGENT_ON_CREATE | REPL_URGENT_ON_UPDATE)},
203                 {"rIDManager", (REPL_URGENT_ON_CREATE | REPL_URGENT_ON_UPDATE)},
204                 {NULL, 0}
205 };
206
207 /* Attributes looked for when updating or deleting, to check for a urgent replication needed */
208 static const char *urgent_attrs[] = {
209                 "lockoutTime",
210                 "pwdLastSet",
211                 "userAccountControl",
212                 NULL
213 };
214
215
216 static bool replmd_check_urgent_objectclass(const struct ldb_message_element *objectclass_el,
217                                         enum urgent_situation situation)
218 {
219         unsigned int i, j;
220         for (i=0; urgent_objects[i].update_name; i++) {
221
222                 if ((situation & urgent_objects[i].repl_situation) == 0) {
223                         continue;
224                 }
225
226                 for (j=0; j<objectclass_el->num_values; j++) {
227                         const struct ldb_val *v = &objectclass_el->values[j];
228                         if (ldb_attr_cmp((const char *)v->data, urgent_objects[i].update_name) == 0) {
229                                 return true;
230                         }
231                 }
232         }
233         return false;
234 }
235
236 static bool replmd_check_urgent_attribute(const struct ldb_message_element *el)
237 {
238         if (ldb_attr_in_list(urgent_attrs, el->name)) {
239                 return true;
240         }
241         return false;
242 }
243
244 static int replmd_replicated_apply_isDeleted(struct replmd_replicated_request *ar);
245
246 /*
247   initialise the module
248   allocate the private structure and build the list
249   of partition DNs for use by replmd_notify()
250  */
251 static int replmd_init(struct ldb_module *module)
252 {
253         struct replmd_private *replmd_private;
254         struct ldb_context *ldb = ldb_module_get_ctx(module);
255         static const char *samba_dsdb_attrs[] = { SAMBA_COMPATIBLE_FEATURES_ATTR, NULL };
256         struct ldb_dn *samba_dsdb_dn;
257         struct ldb_result *res;
258         int ret;
259         TALLOC_CTX *frame = talloc_stackframe();
260         replmd_private = talloc_zero(module, struct replmd_private);
261         if (replmd_private == NULL) {
262                 ldb_oom(ldb);
263                 TALLOC_FREE(frame);
264                 return LDB_ERR_OPERATIONS_ERROR;
265         }
266         ldb_module_set_private(module, replmd_private);
267
268         replmd_private->schema_dn = ldb_get_schema_basedn(ldb);
269
270         samba_dsdb_dn = ldb_dn_new(frame, ldb, "@SAMBA_DSDB");
271         if (!samba_dsdb_dn) {
272                 TALLOC_FREE(frame);
273                 return ldb_oom(ldb);
274         }
275
276         ret = dsdb_module_search_dn(module, frame, &res, samba_dsdb_dn,
277                                     samba_dsdb_attrs, DSDB_FLAG_NEXT_MODULE, NULL);
278         if (ret == LDB_SUCCESS) {
279                 replmd_private->sorted_links
280                         = ldb_msg_check_string_attribute(res->msgs[0],
281                                                          SAMBA_COMPATIBLE_FEATURES_ATTR,
282                                                          SAMBA_SORTED_LINKS_FEATURE);
283         }
284         TALLOC_FREE(frame);
285
286         return ldb_next_init(module);
287 }
288
289 /*
290   cleanup our per-transaction contexts
291  */
292 static void replmd_txn_cleanup(struct replmd_private *replmd_private)
293 {
294         talloc_free(replmd_private->la_ctx);
295         replmd_private->la_list = NULL;
296         replmd_private->la_ctx = NULL;
297
298 }
299
300
301 struct la_backlink {
302         struct la_backlink *next, *prev;
303         const char *attr_name;
304         struct ldb_dn *forward_dn;
305         struct GUID target_guid;
306         bool active;
307 };
308
309 /*
310   a ldb_modify request operating on modules below the
311   current module
312  */
313 static int linked_attr_modify(struct ldb_module *module,
314                               const struct ldb_message *message,
315                               struct ldb_request *parent)
316 {
317         struct ldb_request *mod_req;
318         int ret;
319         struct ldb_context *ldb = ldb_module_get_ctx(module);
320         TALLOC_CTX *tmp_ctx = talloc_new(module);
321         struct ldb_result *res;
322
323         res = talloc_zero(tmp_ctx, struct ldb_result);
324         if (!res) {
325                 talloc_free(tmp_ctx);
326                 return ldb_oom(ldb_module_get_ctx(module));
327         }
328
329         ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
330                                 message,
331                                 NULL,
332                                 res,
333                                 ldb_modify_default_callback,
334                                 parent);
335         LDB_REQ_SET_LOCATION(mod_req);
336         if (ret != LDB_SUCCESS) {
337                 talloc_free(tmp_ctx);
338                 return ret;
339         }
340
341         ret = ldb_request_add_control(mod_req, DSDB_CONTROL_REPLICATED_UPDATE_OID,
342                                       false, NULL);
343         if (ret != LDB_SUCCESS) {
344                 return ret;
345         }
346
347         /* Run the new request */
348         ret = ldb_next_request(module, mod_req);
349
350         if (ret == LDB_SUCCESS) {
351                 ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
352         }
353
354         talloc_free(tmp_ctx);
355         return ret;
356 }
357
358 /*
359   process a backlinks we accumulated during a transaction, adding and
360   deleting the backlinks from the target objects
361  */
362 static int replmd_process_backlink(struct ldb_module *module, struct la_backlink *bl, struct ldb_request *parent)
363 {
364         struct ldb_dn *target_dn, *source_dn;
365         int ret;
366         struct ldb_context *ldb = ldb_module_get_ctx(module);
367         struct ldb_message *msg;
368         TALLOC_CTX *frame = talloc_stackframe();
369         char *dn_string;
370
371         /*
372           - find DN of target
373           - find DN of source
374           - construct ldb_message
375               - either an add or a delete
376          */
377         ret = dsdb_module_dn_by_guid(module, frame, &bl->target_guid, &target_dn, parent);
378         if (ret != LDB_SUCCESS) {
379                 struct GUID_txt_buf guid_str;
380                 DBG_WARNING("Failed to find target DN for linked attribute with GUID %s\n",
381                             GUID_buf_string(&bl->target_guid, &guid_str));
382                 DBG_WARNING("Please run 'samba-tool dbcheck' to resolve any missing backlinks.\n");
383                 talloc_free(frame);
384                 return LDB_SUCCESS;
385         }
386
387         msg = ldb_msg_new(frame);
388         if (msg == NULL) {
389                 ldb_module_oom(module);
390                 talloc_free(frame);
391                 return LDB_ERR_OPERATIONS_ERROR;
392         }
393
394         source_dn = ldb_dn_copy(frame, bl->forward_dn);
395         if (!source_dn) {
396                 ldb_module_oom(module);
397                 talloc_free(frame);
398                 return LDB_ERR_OPERATIONS_ERROR;
399         } else {
400                 /* Filter down to the attributes we want in the backlink */
401                 const char *accept[] = { "GUID", "SID", NULL };
402                 ldb_dn_extended_filter(source_dn, accept);
403         }
404
405         /* construct a ldb_message for adding/deleting the backlink */
406         msg->dn = target_dn;
407         dn_string = ldb_dn_get_extended_linearized(frame, bl->forward_dn, 1);
408         if (!dn_string) {
409                 ldb_module_oom(module);
410                 talloc_free(frame);
411                 return LDB_ERR_OPERATIONS_ERROR;
412         }
413         ret = ldb_msg_add_steal_string(msg, bl->attr_name, dn_string);
414         if (ret != LDB_SUCCESS) {
415                 talloc_free(frame);
416                 return ret;
417         }
418         msg->elements[0].flags = bl->active?LDB_FLAG_MOD_ADD:LDB_FLAG_MOD_DELETE;
419
420         /* a backlink should never be single valued. Unfortunately the
421            exchange schema has a attribute
422            msExchBridgeheadedLocalConnectorsDNBL which is single
423            valued and a backlink. We need to cope with that by
424            ignoring the single value flag */
425         msg->elements[0].flags |= LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK;
426
427         ret = dsdb_module_modify(module, msg, DSDB_FLAG_NEXT_MODULE, parent);
428         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE && !bl->active) {
429                 /* we allow LDB_ERR_NO_SUCH_ATTRIBUTE as success to
430                    cope with possible corruption where the backlink has
431                    already been removed */
432                 DEBUG(3,("WARNING: backlink from %s already removed from %s - %s\n",
433                          ldb_dn_get_linearized(target_dn),
434                          ldb_dn_get_linearized(source_dn),
435                          ldb_errstring(ldb)));
436                 ret = LDB_SUCCESS;
437         } else if (ret != LDB_SUCCESS) {
438                 ldb_asprintf_errstring(ldb, "Failed to %s backlink from %s to %s - %s",
439                                        bl->active?"add":"remove",
440                                        ldb_dn_get_linearized(source_dn),
441                                        ldb_dn_get_linearized(target_dn),
442                                        ldb_errstring(ldb));
443                 talloc_free(frame);
444                 return ret;
445         }
446         talloc_free(frame);
447         return ret;
448 }
449
450 /*
451   add a backlink to the list of backlinks to add/delete in the prepare
452   commit
453
454   forward_dn is stolen onto the defereed context
455  */
456 static int replmd_defer_add_backlink(struct ldb_module *module,
457                                      struct replmd_private *replmd_private,
458                                      const struct dsdb_schema *schema,
459                                      struct replmd_replicated_request *ac,
460                                      struct ldb_dn *forward_dn,
461                                      struct GUID *target_guid, bool active,
462                                      const struct dsdb_attribute *schema_attr,
463                                      struct ldb_request *parent)
464 {
465         const struct dsdb_attribute *target_attr;
466         struct la_backlink *bl;
467         
468         bl = talloc(ac, struct la_backlink);
469         if (bl == NULL) {
470                 ldb_module_oom(module);
471                 return LDB_ERR_OPERATIONS_ERROR;
472         }
473
474         target_attr = dsdb_attribute_by_linkID(schema, schema_attr->linkID ^ 1);
475         if (!target_attr) {
476                 /*
477                  * windows 2003 has a broken schema where the
478                  * definition of msDS-IsDomainFor is missing (which is
479                  * supposed to be the backlink of the
480                  * msDS-HasDomainNCs attribute
481                  */
482                 return LDB_SUCCESS;
483         }
484
485         bl->attr_name = target_attr->lDAPDisplayName;
486         bl->forward_dn = talloc_steal(bl, forward_dn);
487         bl->target_guid = *target_guid;
488         bl->active = active;
489
490         DLIST_ADD(ac->la_backlinks, bl);
491
492         return LDB_SUCCESS;
493 }
494
495 /*
496   add a backlink to the list of backlinks to add/delete in the prepare
497   commit
498  */
499 static int replmd_add_backlink(struct ldb_module *module,
500                                struct replmd_private *replmd_private,
501                                const struct dsdb_schema *schema,
502                                struct ldb_dn *forward_dn,
503                                struct GUID *target_guid, bool active,
504                                const struct dsdb_attribute *schema_attr,
505                                struct ldb_request *parent)
506 {
507         const struct dsdb_attribute *target_attr;
508         struct la_backlink bl;
509         int ret;
510         
511         target_attr = dsdb_attribute_by_linkID(schema, schema_attr->linkID ^ 1);
512         if (!target_attr) {
513                 /*
514                  * windows 2003 has a broken schema where the
515                  * definition of msDS-IsDomainFor is missing (which is
516                  * supposed to be the backlink of the
517                  * msDS-HasDomainNCs attribute
518                  */
519                 return LDB_SUCCESS;
520         }
521
522         bl.attr_name = target_attr->lDAPDisplayName;
523         bl.forward_dn = forward_dn;
524         bl.target_guid = *target_guid;
525         bl.active = active;
526
527         ret = replmd_process_backlink(module, &bl, parent);
528         return ret;
529 }
530
531
532 /*
533  * Callback for most write operations in this module:
534  *
535  * notify the repl task that a object has changed. The notifies are
536  * gathered up in the replmd_private structure then written to the
537  * @REPLCHANGED object in each partition during the prepare_commit
538  */
539 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
540 {
541         int ret;
542         struct replmd_replicated_request *ac =
543                 talloc_get_type_abort(req->context, struct replmd_replicated_request);
544         struct replmd_private *replmd_private =
545                 talloc_get_type_abort(ldb_module_get_private(ac->module), struct replmd_private);
546         struct nc_entry *modified_partition;
547         struct ldb_control *partition_ctrl;
548         const struct dsdb_control_current_partition *partition;
549
550         struct ldb_control **controls;
551
552         partition_ctrl = ldb_reply_get_control(ares, DSDB_CONTROL_CURRENT_PARTITION_OID);
553
554         controls = ares->controls;
555         if (ldb_request_get_control(ac->req,
556                                     DSDB_CONTROL_CURRENT_PARTITION_OID) == NULL) {
557                 /*
558                  * Remove the current partition control from what we pass up
559                  * the chain if it hasn't been requested manually.
560                  */
561                 controls = ldb_controls_except_specified(ares->controls, ares,
562                                                          partition_ctrl);
563         }
564
565         if (ares->error != LDB_SUCCESS) {
566                 struct GUID_txt_buf guid_txt;
567                 struct ldb_message *msg = NULL;
568                 char *s = NULL;
569
570                 if (ac->apply_mode == false) {
571                         DBG_NOTICE("Originating update failure. Error is: %s\n",
572                                    ldb_strerror(ares->error));
573                         return ldb_module_done(ac->req, controls,
574                                                ares->response, ares->error);
575                 }
576
577                 msg = ac->objs->objects[ac->index_current].msg;
578                 /*
579                  * Set at DBG_NOTICE as once these start to happe, they
580                  * will happen a lot until resolved, due to repeated
581                  * replication.  The caller will probably print the
582                  * ldb error string anyway.
583                  */
584                 DBG_NOTICE("DRS replication apply failure for %s. Error is: %s\n",
585                            ldb_dn_get_linearized(msg->dn),
586                            ldb_strerror(ares->error));
587
588                 s = ldb_ldif_message_redacted_string(ldb_module_get_ctx(ac->module),
589                                                      ac,
590                                                      LDB_CHANGETYPE_ADD,
591                                                      msg);
592
593                 DBG_INFO("Failing DRS %s replication message was %s:\n%s\n",
594                          ac->search_msg == NULL ? "ADD" : "MODIFY",
595                          GUID_buf_string(&ac->objs->objects[ac->index_current].object_guid,
596                                          &guid_txt),
597                          s);
598                 talloc_free(s);
599                 return ldb_module_done(ac->req, controls,
600                                        ares->response, ares->error);
601         }
602
603         if (ares->type != LDB_REPLY_DONE) {
604                 ldb_set_errstring(ldb_module_get_ctx(ac->module), "Invalid reply type for notify\n!");
605                 return ldb_module_done(ac->req, NULL,
606                                        NULL, LDB_ERR_OPERATIONS_ERROR);
607         }
608
609         if (ac->apply_mode == false) {
610                 struct la_backlink *bl;
611                 /*
612                  * process our backlink list after an replmd_add(),
613                  * creating and deleting backlinks as necessary (this
614                  * code is sync).  The other cases are handled inline
615                  * with the modify.
616                  */
617                 for (bl=ac->la_backlinks; bl; bl=bl->next) {
618                         ret = replmd_process_backlink(ac->module, bl, ac->req);
619                         if (ret != LDB_SUCCESS) {
620                                 return ldb_module_done(ac->req, NULL,
621                                                        NULL, ret);
622                         }
623                 }
624         }
625         
626         if (!partition_ctrl) {
627                 ldb_set_errstring(ldb_module_get_ctx(ac->module),"No partition control on reply");
628                 return ldb_module_done(ac->req, NULL,
629                                        NULL, LDB_ERR_OPERATIONS_ERROR);
630         }
631
632         partition = talloc_get_type_abort(partition_ctrl->data,
633                                     struct dsdb_control_current_partition);
634
635         if (ac->seq_num > 0) {
636                 for (modified_partition = replmd_private->ncs; modified_partition;
637                      modified_partition = modified_partition->next) {
638                         if (ldb_dn_compare(modified_partition->dn, partition->dn) == 0) {
639                                 break;
640                         }
641                 }
642
643                 if (modified_partition == NULL) {
644                         modified_partition = talloc_zero(replmd_private, struct nc_entry);
645                         if (!modified_partition) {
646                                 ldb_oom(ldb_module_get_ctx(ac->module));
647                                 return ldb_module_done(ac->req, NULL,
648                                                        NULL, LDB_ERR_OPERATIONS_ERROR);
649                         }
650                         modified_partition->dn = ldb_dn_copy(modified_partition, partition->dn);
651                         if (!modified_partition->dn) {
652                                 ldb_oom(ldb_module_get_ctx(ac->module));
653                                 return ldb_module_done(ac->req, NULL,
654                                                        NULL, LDB_ERR_OPERATIONS_ERROR);
655                         }
656                         DLIST_ADD(replmd_private->ncs, modified_partition);
657                 }
658
659                 if (ac->seq_num > modified_partition->mod_usn) {
660                         modified_partition->mod_usn = ac->seq_num;
661                         if (ac->is_urgent) {
662                                 modified_partition->mod_usn_urgent = ac->seq_num;
663                         }
664                 }
665                 if (!ac->apply_mode) {
666                         replmd_private->originating_updates = true;
667                 }
668         }
669
670         if (ac->apply_mode) {
671                 ret = replmd_replicated_apply_isDeleted(ac);
672                 if (ret != LDB_SUCCESS) {
673                         return ldb_module_done(ac->req, NULL, NULL, ret);
674                 }
675                 return ret;
676         } else {
677                 /* free the partition control container here, for the
678                  * common path.  Other cases will have it cleaned up
679                  * eventually with the ares */
680                 talloc_free(partition_ctrl);
681                 return ldb_module_done(ac->req, controls,
682                                        ares->response, LDB_SUCCESS);
683         }
684 }
685
686
687 /*
688  * update a @REPLCHANGED record in each partition if there have been
689  * any writes of replicated data in the partition
690  */
691 static int replmd_notify_store(struct ldb_module *module, struct ldb_request *parent)
692 {
693         struct replmd_private *replmd_private =
694                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
695
696         while (replmd_private->ncs) {
697                 int ret;
698                 struct nc_entry *modified_partition = replmd_private->ncs;
699
700                 ret = dsdb_module_save_partition_usn(module, modified_partition->dn,
701                                                      modified_partition->mod_usn,
702                                                      modified_partition->mod_usn_urgent, parent);
703                 if (ret != LDB_SUCCESS) {
704                         DEBUG(0,(__location__ ": Failed to save partition uSN for %s\n",
705                                  ldb_dn_get_linearized(modified_partition->dn)));
706                         return ret;
707                 }
708
709                 if (ldb_dn_compare(modified_partition->dn,
710                                    replmd_private->schema_dn) == 0) {
711                         struct ldb_result *ext_res;
712                         ret = dsdb_module_extended(module,
713                                                    replmd_private->schema_dn,
714                                                    &ext_res,
715                                                    DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID,
716                                                    ext_res,
717                                                    DSDB_FLAG_NEXT_MODULE,
718                                                    parent);
719                         if (ret != LDB_SUCCESS) {
720                                 return ret;
721                         }
722                         talloc_free(ext_res);
723                 }
724
725                 DLIST_REMOVE(replmd_private->ncs, modified_partition);
726                 talloc_free(modified_partition);
727         }
728
729         return LDB_SUCCESS;
730 }
731
732
733 /*
734   created a replmd_replicated_request context
735  */
736 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
737                                                          struct ldb_request *req)
738 {
739         struct ldb_context *ldb;
740         struct replmd_replicated_request *ac;
741         const struct GUID *our_invocation_id;
742
743         ldb = ldb_module_get_ctx(module);
744
745         ac = talloc_zero(req, struct replmd_replicated_request);
746         if (ac == NULL) {
747                 ldb_oom(ldb);
748                 return NULL;
749         }
750
751         ac->module = module;
752         ac->req = req;
753
754         ac->schema = dsdb_get_schema(ldb, ac);
755         if (!ac->schema) {
756                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
757                               "replmd_modify: no dsdb_schema loaded");
758                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
759                 talloc_free(ac);
760                 return NULL;
761         }
762
763         /* get our invocationId */
764         our_invocation_id = samdb_ntds_invocation_id(ldb);
765         if (!our_invocation_id) {
766                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
767                               "replmd_add: unable to find invocationId\n");
768                 talloc_free(ac);
769                 return NULL;
770         }
771         ac->our_invocation_id = *our_invocation_id;
772
773         return ac;
774 }
775
776 /*
777   add a time element to a record
778 */
779 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
780 {
781         struct ldb_message_element *el;
782         char *s;
783         int ret;
784
785         if (ldb_msg_find_element(msg, attr) != NULL) {
786                 return LDB_SUCCESS;
787         }
788
789         s = ldb_timestring(msg, t);
790         if (s == NULL) {
791                 return LDB_ERR_OPERATIONS_ERROR;
792         }
793
794         ret = ldb_msg_add_string(msg, attr, s);
795         if (ret != LDB_SUCCESS) {
796                 return ret;
797         }
798
799         el = ldb_msg_find_element(msg, attr);
800         /* always set as replace. This works because on add ops, the flag
801            is ignored */
802         el->flags = LDB_FLAG_MOD_REPLACE;
803
804         return LDB_SUCCESS;
805 }
806
807 /*
808   add a uint64_t element to a record
809 */
810 static int add_uint64_element(struct ldb_context *ldb, struct ldb_message *msg,
811                               const char *attr, uint64_t v)
812 {
813         struct ldb_message_element *el;
814         int ret;
815
816         if (ldb_msg_find_element(msg, attr) != NULL) {
817                 return LDB_SUCCESS;
818         }
819
820         ret = samdb_msg_add_uint64(ldb, msg, msg, attr, v);
821         if (ret != LDB_SUCCESS) {
822                 return ret;
823         }
824
825         el = ldb_msg_find_element(msg, attr);
826         /* always set as replace. This works because on add ops, the flag
827            is ignored */
828         el->flags = LDB_FLAG_MOD_REPLACE;
829
830         return LDB_SUCCESS;
831 }
832
833 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
834                                                    const struct replPropertyMetaData1 *m2,
835                                                    const uint32_t *rdn_attid)
836 {
837         /*
838          * This assignment seems inoccous, but it is critical for the
839          * system, as we need to do the comparisons as a unsigned
840          * quantity, not signed (enums are signed integers)
841          */
842         uint32_t attid_1 = m1->attid;
843         uint32_t attid_2 = m2->attid;
844
845         if (attid_1 == attid_2) {
846                 return 0;
847         }
848
849         /*
850          * See above regarding this being an unsigned comparison.
851          * Otherwise when the high bit is set on non-standard
852          * attributes, they would end up first, before objectClass
853          * (0).
854          */
855         return attid_1 > attid_2 ? 1 : -1;
856 }
857
858 static int replmd_replPropertyMetaDataCtr1_verify(struct ldb_context *ldb,
859                                                   struct replPropertyMetaDataCtr1 *ctr1,
860                                                   struct ldb_dn *dn)
861 {
862         if (ctr1->count == 0) {
863                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
864                               "No elements found in replPropertyMetaData for %s!\n",
865                               ldb_dn_get_linearized(dn));
866                 return LDB_ERR_CONSTRAINT_VIOLATION;
867         }
868
869         /* the objectClass attribute is value 0x00000000, so must be first */
870         if (ctr1->array[0].attid != DRSUAPI_ATTID_objectClass) {
871                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
872                               "No objectClass found in replPropertyMetaData for %s!\n",
873                               ldb_dn_get_linearized(dn));
874                 return LDB_ERR_OBJECT_CLASS_VIOLATION;
875         }
876
877         return LDB_SUCCESS;
878 }
879
880 static int replmd_replPropertyMetaDataCtr1_sort_and_verify(struct ldb_context *ldb,
881                                                            struct replPropertyMetaDataCtr1 *ctr1,
882                                                            struct ldb_dn *dn)
883 {
884         /* Note this is O(n^2) for the almost-sorted case, which this is */
885         LDB_TYPESAFE_QSORT(ctr1->array, ctr1->count, NULL,
886                            replmd_replPropertyMetaData1_attid_sort);
887         return replmd_replPropertyMetaDataCtr1_verify(ldb, ctr1, dn);
888 }
889
890 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
891                                                  const struct ldb_message_element *e2,
892                                                  const struct dsdb_schema *schema)
893 {
894         const struct dsdb_attribute *a1;
895         const struct dsdb_attribute *a2;
896
897         /*
898          * TODO: make this faster by caching the dsdb_attribute pointer
899          *       on the ldb_messag_element
900          */
901
902         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
903         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
904
905         /*
906          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
907          *       in the schema
908          */
909         if (!a1 || !a2) {
910                 return strcasecmp(e1->name, e2->name);
911         }
912         if (a1->attributeID_id == a2->attributeID_id) {
913                 return 0;
914         }
915         return a1->attributeID_id > a2->attributeID_id ? 1 : -1;
916 }
917
918 static void replmd_ldb_message_sort(struct ldb_message *msg,
919                                     const struct dsdb_schema *schema)
920 {
921         LDB_TYPESAFE_QSORT(msg->elements, msg->num_elements, schema, replmd_ldb_message_element_attid_sort);
922 }
923
924 static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
925                                const struct GUID *invocation_id, uint64_t seq_num,
926                                uint64_t local_usn, NTTIME nttime, uint32_t version, bool deleted);
927
928 static int parsed_dn_compare(struct parsed_dn *pdn1, struct parsed_dn *pdn2);
929
930 static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
931                           struct ldb_message_element *el, struct parsed_dn **pdn,
932                           const char *ldap_oid, struct ldb_request *parent);
933
934 /*
935   fix up linked attributes in replmd_add.
936   This involves setting up the right meta-data in extended DN
937   components, and creating backlinks to the object
938  */
939 static int replmd_add_fix_la(struct ldb_module *module, TALLOC_CTX *mem_ctx,
940                              struct replmd_private *replmd_private,
941                              struct ldb_message_element *el,
942                              struct replmd_replicated_request *ac,
943                              NTTIME now,
944                              struct ldb_dn *forward_dn,
945                              const struct dsdb_attribute *sa,
946                              struct ldb_request *parent)
947 {
948         unsigned int i;
949         TALLOC_CTX *tmp_ctx = talloc_new(mem_ctx);
950         struct ldb_context *ldb = ldb_module_get_ctx(module);
951         struct parsed_dn *pdn;
952         /* We will take a reference to the schema in replmd_add_backlink */
953         const struct dsdb_schema *schema = dsdb_get_schema(ldb, NULL);
954         struct ldb_val *new_values = NULL;
955         int ret;
956
957         if (dsdb_check_single_valued_link(sa, el) == LDB_SUCCESS) {
958                 el->flags |= LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK;
959         } else {
960                 ldb_asprintf_errstring(ldb,
961                                        "Attribute %s is single valued but "
962                                        "more than one value has been supplied",
963                                        el->name);
964                 talloc_free(tmp_ctx);
965                 return LDB_ERR_CONSTRAINT_VIOLATION;
966         }
967         
968         ret = get_parsed_dns(module, tmp_ctx, el, &pdn,
969                              sa->syntax->ldap_oid, parent);
970         if (ret != LDB_SUCCESS) {
971                 talloc_free(tmp_ctx);
972                 return ret;
973         }
974
975         new_values = talloc_array(tmp_ctx, struct ldb_val, el->num_values);
976         if (new_values == NULL) {
977                 ldb_module_oom(module);
978                 talloc_free(tmp_ctx);
979                 return LDB_ERR_OPERATIONS_ERROR;
980         }
981
982         for (i = 0; i < el->num_values; i++) {
983                 struct parsed_dn *p = &pdn[i];
984                 if (i > 0 && parsed_dn_compare(p, &pdn[i - 1]) == 0) {
985                         ldb_asprintf_errstring(ldb,
986                                         "Linked attribute %s has "
987                                         "multiple identical values", el->name);
988                         talloc_free(tmp_ctx);
989                         if (ldb_attr_cmp(el->name, "member") == 0) {
990                                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
991                         } else {
992                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
993                         }
994                 }
995                 ret = replmd_build_la_val(el->values, p->v, p->dsdb_dn,
996                                           &ac->our_invocation_id,
997                                           ac->seq_num, ac->seq_num, now, 0, false);
998                 if (ret != LDB_SUCCESS) {
999                         talloc_free(tmp_ctx);
1000                         return ret;
1001                 }
1002
1003                 ret = replmd_defer_add_backlink(module, replmd_private,
1004                                                 schema, ac,
1005                                                 forward_dn, &p->guid, true, sa,
1006                                                 parent);
1007                 if (ret != LDB_SUCCESS) {
1008                         talloc_free(tmp_ctx);
1009                         return ret;
1010                 }
1011
1012                 new_values[i] = *p->v;
1013         }
1014         el->values = talloc_steal(mem_ctx, new_values);
1015
1016         talloc_free(tmp_ctx);
1017         return LDB_SUCCESS;
1018 }
1019
1020 static int replmd_add_make_extended_dn(struct ldb_request *req,
1021                                        const DATA_BLOB *guid_blob,
1022                                        struct ldb_dn **_extended_dn)
1023 {
1024         int ret;
1025         const DATA_BLOB *sid_blob;
1026         /* Calculate an extended DN for any linked attributes */
1027         struct ldb_dn *extended_dn = ldb_dn_copy(req, req->op.add.message->dn);
1028         if (!extended_dn) {
1029                 return LDB_ERR_OPERATIONS_ERROR;
1030         }
1031         ret = ldb_dn_set_extended_component(extended_dn, "GUID", guid_blob);
1032         if (ret != LDB_SUCCESS) {
1033                 return ret;
1034         }
1035
1036         sid_blob = ldb_msg_find_ldb_val(req->op.add.message, "objectSID");
1037         if (sid_blob != NULL) {
1038                 ret = ldb_dn_set_extended_component(extended_dn, "SID", sid_blob);
1039                 if (ret != LDB_SUCCESS) {
1040                         return ret;
1041                 }
1042         }
1043         *_extended_dn = extended_dn;
1044         return LDB_SUCCESS;
1045 }
1046
1047 /*
1048   intercept add requests
1049  */
1050 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
1051 {
1052         struct ldb_context *ldb;
1053         struct ldb_control *control;
1054         struct replmd_replicated_request *ac;
1055         enum ndr_err_code ndr_err;
1056         struct ldb_request *down_req;
1057         struct ldb_message *msg;
1058         const DATA_BLOB *guid_blob;
1059         DATA_BLOB guid_blob_stack;
1060         struct GUID guid;
1061         uint8_t guid_data[16];
1062         struct replPropertyMetaDataBlob nmd;
1063         struct ldb_val nmd_value;
1064         struct ldb_dn *extended_dn = NULL;
1065         
1066         /*
1067          * The use of a time_t here seems odd, but as the NTTIME
1068          * elements are actually declared as NTTIME_1sec in the IDL,
1069          * getting a higher resolution timestamp is not required.
1070          */
1071         time_t t = time(NULL);
1072         NTTIME now;
1073         char *time_str;
1074         int ret;
1075         unsigned int i;
1076         unsigned int functional_level;
1077         uint32_t ni=0;
1078         bool allow_add_guid = false;
1079         bool remove_current_guid = false;
1080         bool is_urgent = false;
1081         bool is_schema_nc = false;
1082         struct ldb_message_element *objectclass_el;
1083         struct replmd_private *replmd_private =
1084                 talloc_get_type_abort(ldb_module_get_private(module), struct replmd_private);
1085
1086         /* check if there's a show relax control (used by provision to say 'I know what I'm doing') */
1087         control = ldb_request_get_control(req, LDB_CONTROL_RELAX_OID);
1088         if (control) {
1089                 allow_add_guid = true;
1090         }
1091
1092         /* do not manipulate our control entries */
1093         if (ldb_dn_is_special(req->op.add.message->dn)) {
1094                 return ldb_next_request(module, req);
1095         }
1096
1097         ldb = ldb_module_get_ctx(module);
1098
1099         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
1100
1101         guid_blob = ldb_msg_find_ldb_val(req->op.add.message, "objectGUID");
1102         if (guid_blob != NULL) {
1103                 if (!allow_add_guid) {
1104                         ldb_set_errstring(ldb,
1105                                           "replmd_add: it's not allowed to add an object with objectGUID!");
1106                         return LDB_ERR_UNWILLING_TO_PERFORM;
1107                 } else {
1108                         NTSTATUS status = GUID_from_data_blob(guid_blob,&guid);
1109                         if (!NT_STATUS_IS_OK(status)) {
1110                                 ldb_set_errstring(ldb,
1111                                                   "replmd_add: Unable to parse the 'objectGUID' as a GUID!");
1112                                 return LDB_ERR_UNWILLING_TO_PERFORM;
1113                         }
1114                         /* we remove this attribute as it can be a string and
1115                          * will not be treated correctly and then we will re-add
1116                          * it later on in the good format */
1117                         remove_current_guid = true;
1118                 }
1119         } else {
1120                 /* a new GUID */
1121                 guid = GUID_random();
1122                 
1123                 guid_blob_stack = data_blob_const(guid_data, sizeof(guid_data));
1124                 
1125                 /* This can't fail */
1126                 ndr_push_struct_into_fixed_blob(&guid_blob_stack, &guid,
1127                                                 (ndr_push_flags_fn_t)ndr_push_GUID);
1128                 guid_blob = &guid_blob_stack;
1129         }
1130
1131         ac = replmd_ctx_init(module, req);
1132         if (ac == NULL) {
1133                 return ldb_module_oom(module);
1134         }
1135
1136         functional_level = dsdb_functional_level(ldb);
1137
1138         /* Get a sequence number from the backend */
1139         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ac->seq_num);
1140         if (ret != LDB_SUCCESS) {
1141                 talloc_free(ac);
1142                 return ret;
1143         }
1144
1145         /* we have to copy the message as the caller might have it as a const */
1146         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
1147         if (msg == NULL) {
1148                 ldb_oom(ldb);
1149                 talloc_free(ac);
1150                 return LDB_ERR_OPERATIONS_ERROR;
1151         }
1152
1153         /* generated times */
1154         unix_to_nt_time(&now, t);
1155         time_str = ldb_timestring(msg, t);
1156         if (!time_str) {
1157                 ldb_oom(ldb);
1158                 talloc_free(ac);
1159                 return LDB_ERR_OPERATIONS_ERROR;
1160         }
1161         if (remove_current_guid) {
1162                 ldb_msg_remove_attr(msg,"objectGUID");
1163         }
1164
1165         /*
1166          * remove autogenerated attributes
1167          */
1168         ldb_msg_remove_attr(msg, "whenCreated");
1169         ldb_msg_remove_attr(msg, "whenChanged");
1170         ldb_msg_remove_attr(msg, "uSNCreated");
1171         ldb_msg_remove_attr(msg, "uSNChanged");
1172         ldb_msg_remove_attr(msg, "replPropertyMetaData");
1173
1174         /*
1175          * readd replicated attributes
1176          */
1177         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
1178         if (ret != LDB_SUCCESS) {
1179                 ldb_oom(ldb);
1180                 talloc_free(ac);
1181                 return ret;
1182         }
1183
1184         /* build the replication meta_data */
1185         ZERO_STRUCT(nmd);
1186         nmd.version             = 1;
1187         nmd.ctr.ctr1.count      = msg->num_elements;
1188         nmd.ctr.ctr1.array      = talloc_array(msg,
1189                                                struct replPropertyMetaData1,
1190                                                nmd.ctr.ctr1.count);
1191         if (!nmd.ctr.ctr1.array) {
1192                 ldb_oom(ldb);
1193                 talloc_free(ac);
1194                 return LDB_ERR_OPERATIONS_ERROR;
1195         }
1196
1197         is_schema_nc = ldb_dn_compare_base(replmd_private->schema_dn, msg->dn) == 0;
1198
1199         for (i=0; i < msg->num_elements;) {
1200                 struct ldb_message_element *e = &msg->elements[i];
1201                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
1202                 const struct dsdb_attribute *sa;
1203
1204                 if (e->name[0] == '@') {
1205                         i++;
1206                         continue;
1207                 }
1208
1209                 sa = dsdb_attribute_by_lDAPDisplayName(ac->schema, e->name);
1210                 if (!sa) {
1211                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
1212                                       "replmd_add: attribute '%s' not defined in schema\n",
1213                                       e->name);
1214                         talloc_free(ac);
1215                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
1216                 }
1217
1218                 if ((sa->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (sa->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
1219                         /* if the attribute is not replicated (0x00000001)
1220                          * or constructed (0x00000004) it has no metadata
1221                          */
1222                         i++;
1223                         continue;
1224                 }
1225
1226                 if (sa->linkID != 0 && functional_level > DS_DOMAIN_FUNCTION_2000) {
1227                         if (extended_dn == NULL) {
1228                                 ret = replmd_add_make_extended_dn(req,
1229                                                                   guid_blob,
1230                                                                   &extended_dn);
1231                                 if (ret != LDB_SUCCESS) {
1232                                         talloc_free(ac);
1233                                         return ret;
1234                                 }
1235                         }                       
1236
1237                         /*
1238                          * Prepare the context for the backlinks and
1239                          * create metadata for the forward links.  The
1240                          * backlinks are created in
1241                          * replmd_op_callback() after the successful
1242                          * ADD of the object.
1243                          */
1244                         ret = replmd_add_fix_la(module, msg->elements,
1245                                                 replmd_private, e,
1246                                                 ac, now,
1247                                                 extended_dn,
1248                                                 sa, req);
1249                         if (ret != LDB_SUCCESS) {
1250                                 talloc_free(ac);
1251                                 return ret;
1252                         }
1253                         /* linked attributes are not stored in
1254                            replPropertyMetaData in FL above w2k */
1255                         i++;
1256                         continue;
1257                 }
1258
1259                 m->attid   = dsdb_attribute_get_attid(sa, is_schema_nc);
1260                 m->version = 1;
1261                 if (m->attid == DRSUAPI_ATTID_isDeleted) {
1262                         const struct ldb_val *rdn_val = ldb_dn_get_rdn_val(msg->dn);
1263                         const char* rdn;
1264
1265                         if (rdn_val == NULL) {
1266                                 ldb_oom(ldb);
1267                                 talloc_free(ac);
1268                                 return LDB_ERR_OPERATIONS_ERROR;
1269                         }
1270
1271                         rdn = (const char*)rdn_val->data;
1272                         if (strcmp(rdn, "Deleted Objects") == 0) {
1273                                 /*
1274                                  * Set the originating_change_time to 29/12/9999 at 23:59:59
1275                                  * as specified in MS-ADTS 7.1.1.4.2 Deleted Objects Container
1276                                  */
1277                                 m->originating_change_time      = DELETED_OBJECT_CONTAINER_CHANGE_TIME;
1278                         } else {
1279                                 m->originating_change_time      = now;
1280                         }
1281                 } else {
1282                         m->originating_change_time      = now;
1283                 }
1284                 m->originating_invocation_id    = ac->our_invocation_id;
1285                 m->originating_usn              = ac->seq_num;
1286                 m->local_usn                    = ac->seq_num;
1287                 ni++;
1288
1289                 if (!(e->flags & DSDB_FLAG_INTERNAL_FORCE_META_DATA)) {
1290                         i++;
1291                         continue;
1292                 }
1293
1294                 e->flags &= ~DSDB_FLAG_INTERNAL_FORCE_META_DATA;
1295
1296                 if (e->num_values != 0) {
1297                         i++;
1298                         continue;
1299                 }
1300
1301                 ldb_msg_remove_element(msg, e);
1302         }
1303
1304         /* fix meta data count */
1305         nmd.ctr.ctr1.count = ni;
1306
1307         /*
1308          * sort meta data array
1309          */
1310         ret = replmd_replPropertyMetaDataCtr1_sort_and_verify(ldb, &nmd.ctr.ctr1, msg->dn);
1311         if (ret != LDB_SUCCESS) {
1312                 ldb_asprintf_errstring(ldb, "%s: error during direct ADD: %s", __func__, ldb_errstring(ldb));
1313                 talloc_free(ac);
1314                 return ret;
1315         }
1316
1317         /* generated NDR encoded values */
1318         ndr_err = ndr_push_struct_blob(&nmd_value, msg,
1319                                        &nmd,
1320                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1321         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1322                 ldb_oom(ldb);
1323                 talloc_free(ac);
1324                 return LDB_ERR_OPERATIONS_ERROR;
1325         }
1326
1327         /*
1328          * add the autogenerated values
1329          */
1330         ret = dsdb_msg_add_guid(msg, &guid, "objectGUID");
1331         if (ret != LDB_SUCCESS) {
1332                 ldb_oom(ldb);
1333                 talloc_free(ac);
1334                 return ret;
1335         }
1336         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
1337         if (ret != LDB_SUCCESS) {
1338                 ldb_oom(ldb);
1339                 talloc_free(ac);
1340                 return ret;
1341         }
1342         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", ac->seq_num);
1343         if (ret != LDB_SUCCESS) {
1344                 ldb_oom(ldb);
1345                 talloc_free(ac);
1346                 return ret;
1347         }
1348         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ac->seq_num);
1349         if (ret != LDB_SUCCESS) {
1350                 ldb_oom(ldb);
1351                 talloc_free(ac);
1352                 return ret;
1353         }
1354         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
1355         if (ret != LDB_SUCCESS) {
1356                 ldb_oom(ldb);
1357                 talloc_free(ac);
1358                 return ret;
1359         }
1360
1361         /*
1362          * sort the attributes by attid before storing the object
1363          */
1364         replmd_ldb_message_sort(msg, ac->schema);
1365
1366         /*
1367          * Assert that we do have an objectClass
1368          */
1369         objectclass_el = ldb_msg_find_element(msg, "objectClass");
1370         if (objectclass_el == NULL) {
1371                 ldb_asprintf_errstring(ldb, __location__
1372                                        ": objectClass missing on %s\n",
1373                                        ldb_dn_get_linearized(msg->dn));
1374                 talloc_free(ac);
1375                 return LDB_ERR_OBJECT_CLASS_VIOLATION;
1376         }
1377         is_urgent = replmd_check_urgent_objectclass(objectclass_el,
1378                                                         REPL_URGENT_ON_CREATE);
1379
1380         ac->is_urgent = is_urgent;
1381         ret = ldb_build_add_req(&down_req, ldb, ac,
1382                                 msg,
1383                                 req->controls,
1384                                 ac, replmd_op_callback,
1385                                 req);
1386
1387         LDB_REQ_SET_LOCATION(down_req);
1388         if (ret != LDB_SUCCESS) {
1389                 talloc_free(ac);
1390                 return ret;
1391         }
1392
1393         /* current partition control is needed by "replmd_op_callback" */
1394         if (ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID) == NULL) {
1395                 ret = ldb_request_add_control(down_req,
1396                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
1397                                               false, NULL);
1398                 if (ret != LDB_SUCCESS) {
1399                         talloc_free(ac);
1400                         return ret;
1401                 }
1402         }
1403
1404         if (functional_level == DS_DOMAIN_FUNCTION_2000) {
1405                 ret = ldb_request_add_control(down_req, DSDB_CONTROL_APPLY_LINKS, false, NULL);
1406                 if (ret != LDB_SUCCESS) {
1407                         talloc_free(ac);
1408                         return ret;
1409                 }
1410         }
1411
1412         /* mark the control done */
1413         if (control) {
1414                 control->critical = 0;
1415         }
1416         /* go on with the call chain */
1417         return ldb_next_request(module, down_req);
1418 }
1419
1420
1421 /*
1422  * update the replPropertyMetaData for one element
1423  */
1424 static int replmd_update_rpmd_element(struct ldb_context *ldb,
1425                                       struct ldb_message *msg,
1426                                       struct ldb_message_element *el,
1427                                       struct ldb_message_element *old_el,
1428                                       struct replPropertyMetaDataBlob *omd,
1429                                       const struct dsdb_schema *schema,
1430                                       uint64_t *seq_num,
1431                                       const struct GUID *our_invocation_id,
1432                                       NTTIME now,
1433                                       bool is_schema_nc,
1434                                       bool is_forced_rodc,
1435                                       struct ldb_request *req)
1436 {
1437         uint32_t i;
1438         const struct dsdb_attribute *a;
1439         struct replPropertyMetaData1 *md1;
1440         bool may_skip = false;
1441         uint32_t attid;
1442
1443         a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
1444         if (a == NULL) {
1445                 if (ldb_request_get_control(req, LDB_CONTROL_RELAX_OID)) {
1446                         /* allow this to make it possible for dbcheck
1447                            to remove bad attributes */
1448                         return LDB_SUCCESS;
1449                 }
1450
1451                 DEBUG(0,(__location__ ": Unable to find attribute %s in schema\n",
1452                          el->name));
1453                 return LDB_ERR_OPERATIONS_ERROR;
1454         }
1455
1456         attid = dsdb_attribute_get_attid(a, is_schema_nc);
1457
1458         if ((a->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (a->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
1459                 return LDB_SUCCESS;
1460         }
1461
1462         /*
1463          * if the attribute's value haven't changed, and this isn't
1464          * just a delete of everything then return LDB_SUCCESS Unless
1465          * we have the provision control or if the attribute is
1466          * interSiteTopologyGenerator as this page explain:
1467          * http://support.microsoft.com/kb/224815 this attribute is
1468          * periodicaly written by the DC responsible for the intersite
1469          * generation in a given site
1470          *
1471          * Unchanged could be deleting or replacing an already-gone
1472          * thing with an unconstrained delete/empty replace or a
1473          * replace with the same value, but not an add with the same
1474          * value because that could be about adding a duplicate (which
1475          * is for someone else to error out on).
1476          */
1477         if (old_el != NULL && ldb_msg_element_equal_ordered(el, old_el)) {
1478                 if (LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_REPLACE) {
1479                         may_skip = true;
1480                 }
1481         } else if (old_el == NULL && el->num_values == 0) {
1482                 if (LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_REPLACE) {
1483                         may_skip = true;
1484                 } else if (LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_DELETE) {
1485                         may_skip = true;
1486                 }
1487         } else if (a->linkID != 0 && LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_DELETE &&
1488                    ldb_request_get_control(req, DSDB_CONTROL_REPLMD_VANISH_LINKS) != NULL) {
1489                 /*
1490                  * We intentionally skip the version bump when attempting to
1491                  * vanish links.
1492                  *
1493                  * The control is set by dbcheck and expunge-tombstones which
1494                  * both attempt to be non-replicating. Otherwise, making an
1495                  * alteration to the replication state would trigger a
1496                  * broadcast of all expunged objects.
1497                  */
1498                 may_skip = true;
1499         }
1500
1501         if (el->flags & DSDB_FLAG_INTERNAL_FORCE_META_DATA) {
1502                 may_skip = false;
1503                 el->flags &= ~DSDB_FLAG_INTERNAL_FORCE_META_DATA;
1504         }
1505
1506         if (may_skip) {
1507                 if (strcmp(el->name, "interSiteTopologyGenerator") != 0 &&
1508                     !ldb_request_get_control(req, LDB_CONTROL_PROVISION_OID)) {
1509                         /*
1510                          * allow this to make it possible for dbcheck
1511                          * to rebuild broken metadata
1512                          */
1513                         return LDB_SUCCESS;
1514                 }
1515         }
1516
1517         for (i=0; i<omd->ctr.ctr1.count; i++) {
1518                 /*
1519                  * First check if we find it under the msDS-IntID,
1520                  * then check if we find it under the OID and
1521                  * prefixMap ID.
1522                  *
1523                  * This allows the administrator to simply re-write
1524                  * the attributes and so restore replication, which is
1525                  * likely what they will try to do.
1526                  */
1527                 if (attid == omd->ctr.ctr1.array[i].attid) {
1528                         break;
1529                 }
1530
1531                 if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) {
1532                         break;
1533                 }
1534         }
1535
1536         if (a->linkID != 0 && dsdb_functional_level(ldb) > DS_DOMAIN_FUNCTION_2000) {
1537                 /* linked attributes are not stored in
1538                    replPropertyMetaData in FL above w2k, but we do
1539                    raise the seqnum for the object  */
1540                 if (*seq_num == 0 &&
1541                     ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num) != LDB_SUCCESS) {
1542                         return LDB_ERR_OPERATIONS_ERROR;
1543                 }
1544                 return LDB_SUCCESS;
1545         }
1546
1547         if (i == omd->ctr.ctr1.count) {
1548                 /* we need to add a new one */
1549                 omd->ctr.ctr1.array = talloc_realloc(msg, omd->ctr.ctr1.array,
1550                                                      struct replPropertyMetaData1, omd->ctr.ctr1.count+1);
1551                 if (omd->ctr.ctr1.array == NULL) {
1552                         ldb_oom(ldb);
1553                         return LDB_ERR_OPERATIONS_ERROR;
1554                 }
1555                 omd->ctr.ctr1.count++;
1556                 ZERO_STRUCT(omd->ctr.ctr1.array[i]);
1557         }
1558
1559         /* Get a new sequence number from the backend. We only do this
1560          * if we have a change that requires a new
1561          * replPropertyMetaData element
1562          */
1563         if (*seq_num == 0) {
1564                 int ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num);
1565                 if (ret != LDB_SUCCESS) {
1566                         return LDB_ERR_OPERATIONS_ERROR;
1567                 }
1568         }
1569
1570         md1 = &omd->ctr.ctr1.array[i];
1571         md1->version++;
1572         md1->attid = attid;
1573
1574         if (md1->attid == DRSUAPI_ATTID_isDeleted) {
1575                 const struct ldb_val *rdn_val = ldb_dn_get_rdn_val(msg->dn);
1576                 const char* rdn;
1577
1578                 if (rdn_val == NULL) {
1579                         ldb_oom(ldb);
1580                         return LDB_ERR_OPERATIONS_ERROR;
1581                 }
1582
1583                 rdn = (const char*)rdn_val->data;
1584                 if (strcmp(rdn, "Deleted Objects") == 0) {
1585                         /*
1586                          * Set the originating_change_time to 29/12/9999 at 23:59:59
1587                          * as specified in MS-ADTS 7.1.1.4.2 Deleted Objects Container
1588                          */
1589                         md1->originating_change_time    = DELETED_OBJECT_CONTAINER_CHANGE_TIME;
1590                 } else {
1591                         md1->originating_change_time    = now;
1592                 }
1593         } else {
1594                 md1->originating_change_time    = now;
1595         }
1596         md1->originating_invocation_id = *our_invocation_id;
1597         md1->originating_usn           = *seq_num;
1598         md1->local_usn                 = *seq_num;
1599
1600         if (is_forced_rodc) {
1601                 /* Force version to 0 to be overriden later via replication */
1602                 md1->version = 0;
1603         }
1604
1605         return LDB_SUCCESS;
1606 }
1607
1608 /*
1609  * Bump the replPropertyMetaData version on an attribute, and if it
1610  * has changed (or forced by leaving rdn_old NULL), update the value
1611  * in the entry.
1612  *
1613  * This is important, as calling a modify operation may not change the
1614  * version number if the values appear unchanged, but a rename between
1615  * parents bumps this value.
1616  *
1617  */
1618 static int replmd_update_rpmd_rdn_attr(struct ldb_context *ldb,
1619                                        struct ldb_message *msg,
1620                                        const struct ldb_val *rdn_new,
1621                                        const struct ldb_val *rdn_old,
1622                                        struct replPropertyMetaDataBlob *omd,
1623                                        struct replmd_replicated_request *ar,
1624                                        NTTIME now,
1625                                        bool is_schema_nc,
1626                                        bool is_forced_rodc)
1627 {
1628         const char *rdn_name = ldb_dn_get_rdn_name(msg->dn);
1629         const struct dsdb_attribute *rdn_attr =
1630                 dsdb_attribute_by_lDAPDisplayName(ar->schema, rdn_name);
1631         const char *attr_name = rdn_attr != NULL ?
1632                                 rdn_attr->lDAPDisplayName :
1633                                 rdn_name;
1634         struct ldb_message_element new_el = {
1635                 .flags = LDB_FLAG_MOD_REPLACE,
1636                 .name = attr_name,
1637                 .num_values = 1,
1638                 .values = discard_const_p(struct ldb_val, rdn_new)
1639         };
1640         struct ldb_message_element old_el = {
1641                 .flags = LDB_FLAG_MOD_REPLACE,
1642                 .name = attr_name,
1643                 .num_values = rdn_old ? 1 : 0,
1644                 .values = discard_const_p(struct ldb_val, rdn_old)
1645         };
1646
1647         if (ldb_msg_element_equal_ordered(&new_el, &old_el) == false) {
1648                 int ret = ldb_msg_add(msg, &new_el, LDB_FLAG_MOD_REPLACE);
1649                 if (ret != LDB_SUCCESS) {
1650                         return ldb_oom(ldb);
1651                 }
1652         }
1653
1654         return replmd_update_rpmd_element(ldb, msg, &new_el, NULL,
1655                                           omd, ar->schema, &ar->seq_num,
1656                                           &ar->our_invocation_id,
1657                                           now, is_schema_nc, is_forced_rodc,
1658                                           ar->req);
1659
1660 }
1661
1662 static uint64_t find_max_local_usn(struct replPropertyMetaDataBlob omd)
1663 {
1664         uint32_t count = omd.ctr.ctr1.count;
1665         uint64_t max = 0;
1666         uint32_t i;
1667         for (i=0; i < count; i++) {
1668                 struct replPropertyMetaData1 m = omd.ctr.ctr1.array[i];
1669                 if (max < m.local_usn) {
1670                         max = m.local_usn;
1671                 }
1672         }
1673         return max;
1674 }
1675
1676 /*
1677  * update the replPropertyMetaData object each time we modify an
1678  * object. This is needed for DRS replication, as the merge on the
1679  * client is based on this object
1680  */
1681 static int replmd_update_rpmd(struct ldb_module *module,
1682                               const struct dsdb_schema *schema,
1683                               struct ldb_request *req,
1684                               const char * const *rename_attrs,
1685                               struct ldb_message *msg, uint64_t *seq_num,
1686                               time_t t, bool is_schema_nc,
1687                               bool *is_urgent, bool *rodc)
1688 {
1689         const struct ldb_val *omd_value;
1690         enum ndr_err_code ndr_err;
1691         struct replPropertyMetaDataBlob omd;
1692         unsigned int i;
1693         NTTIME now;
1694         const struct GUID *our_invocation_id;
1695         int ret;
1696         const char * const *attrs = NULL;
1697         const char * const attrs2[] = { "uSNChanged", "objectClass", "instanceType", NULL };
1698         struct ldb_result *res;
1699         struct ldb_context *ldb;
1700         struct ldb_message_element *objectclass_el;
1701         enum urgent_situation situation;
1702         bool rmd_is_provided;
1703         bool rmd_is_just_resorted = false;
1704         const char *not_rename_attrs[4 + msg->num_elements];
1705         bool is_forced_rodc = false;
1706
1707         if (rename_attrs) {
1708                 attrs = rename_attrs;
1709         } else {
1710                 for (i = 0; i < msg->num_elements; i++) {
1711                         not_rename_attrs[i] = msg->elements[i].name;
1712                 }
1713                 not_rename_attrs[i] = "replPropertyMetaData";
1714                 not_rename_attrs[i+1] = "objectClass";
1715                 not_rename_attrs[i+2] = "instanceType";
1716                 not_rename_attrs[i+3] = NULL;
1717                 attrs = not_rename_attrs;
1718         }
1719
1720         ldb = ldb_module_get_ctx(module);
1721
1722         ret = samdb_rodc(ldb, rodc);
1723         if (ret != LDB_SUCCESS) {
1724                 DEBUG(4, (__location__ ": unable to tell if we are an RODC\n"));
1725                 *rodc = false;
1726         }
1727
1728         if (*rodc &&
1729             ldb_request_get_control(req, DSDB_CONTROL_FORCE_RODC_LOCAL_CHANGE)) {
1730                 is_forced_rodc = true;
1731         }
1732
1733         our_invocation_id = samdb_ntds_invocation_id(ldb);
1734         if (!our_invocation_id) {
1735                 /* this happens during an initial vampire while
1736                    updating the schema */
1737                 DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
1738                 return LDB_SUCCESS;
1739         }
1740
1741         unix_to_nt_time(&now, t);
1742
1743         if (ldb_request_get_control(req, DSDB_CONTROL_CHANGEREPLMETADATA_OID)) {
1744                 rmd_is_provided = true;
1745                 if (ldb_request_get_control(req, DSDB_CONTROL_CHANGEREPLMETADATA_RESORT_OID)) {
1746                         rmd_is_just_resorted = true;
1747                 }
1748         } else {
1749                 rmd_is_provided = false;
1750         }
1751
1752         /* if isDeleted is present and is TRUE, then we consider we are deleting,
1753          * otherwise we consider we are updating */
1754         if (ldb_msg_check_string_attribute(msg, "isDeleted", "TRUE")) {
1755                 situation = REPL_URGENT_ON_DELETE;
1756         } else if (rename_attrs) {
1757                 situation = REPL_URGENT_ON_CREATE | REPL_URGENT_ON_DELETE;
1758         } else {
1759                 situation = REPL_URGENT_ON_UPDATE;
1760         }
1761
1762         if (rmd_is_provided) {
1763                 /* In this case the change_replmetadata control was supplied */
1764                 /* We check that it's the only attribute that is provided
1765                  * (it's a rare case so it's better to keep the code simplier)
1766                  * We also check that the highest local_usn is bigger or the same as
1767                  * uSNChanged. */
1768                 uint64_t db_seq;
1769                 if( msg->num_elements != 1 ||
1770                         strncmp(msg->elements[0].name,
1771                                 "replPropertyMetaData", 20) ) {
1772                         DEBUG(0,(__location__ ": changereplmetada control called without "\
1773                                 "a specified replPropertyMetaData attribute or with others\n"));
1774                         return LDB_ERR_OPERATIONS_ERROR;
1775                 }
1776                 if (situation != REPL_URGENT_ON_UPDATE) {
1777                         DEBUG(0,(__location__ ": changereplmetada control can't be called when deleting an object\n"));
1778                         return LDB_ERR_OPERATIONS_ERROR;
1779                 }
1780                 omd_value = ldb_msg_find_ldb_val(msg, "replPropertyMetaData");
1781                 if (!omd_value) {
1782                         DEBUG(0,(__location__ ": replPropertyMetaData was not specified for Object %s\n",
1783                                  ldb_dn_get_linearized(msg->dn)));
1784                         return LDB_ERR_OPERATIONS_ERROR;
1785                 }
1786                 ndr_err = ndr_pull_struct_blob(omd_value, msg, &omd,
1787                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
1788                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1789                         DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
1790                                  ldb_dn_get_linearized(msg->dn)));
1791                         return LDB_ERR_OPERATIONS_ERROR;
1792                 }
1793
1794                 ret = dsdb_module_search_dn(module, msg, &res, msg->dn, attrs2,
1795                                             DSDB_FLAG_NEXT_MODULE |
1796                                             DSDB_SEARCH_SHOW_RECYCLED |
1797                                             DSDB_SEARCH_SHOW_EXTENDED_DN |
1798                                             DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT |
1799                                             DSDB_SEARCH_REVEAL_INTERNALS, req);
1800
1801                 if (ret != LDB_SUCCESS) {
1802                         return ret;
1803                 }
1804
1805                 if (rmd_is_just_resorted == false) {
1806                         *seq_num = find_max_local_usn(omd);
1807
1808                         db_seq = ldb_msg_find_attr_as_uint64(res->msgs[0], "uSNChanged", 0);
1809
1810                         /*
1811                          * The test here now allows for a new
1812                          * replPropertyMetaData with no change, if was
1813                          * just dbcheck re-sorting the values.
1814                          */
1815                         if (*seq_num <= db_seq) {
1816                                 DEBUG(0,(__location__ ": changereplmetada control provided but max(local_usn)" \
1817                                          " is less than uSNChanged (max = %lld uSNChanged = %lld)\n",
1818                                          (long long)*seq_num, (long long)db_seq));
1819                                 return LDB_ERR_OPERATIONS_ERROR;
1820                         }
1821                 }
1822
1823         } else {
1824                 /* search for the existing replPropertyMetaDataBlob. We need
1825                  * to use REVEAL and ask for DNs in storage format to support
1826                  * the check for values being the same in
1827                  * replmd_update_rpmd_element()
1828                  */
1829                 ret = dsdb_module_search_dn(module, msg, &res, msg->dn, attrs,
1830                                             DSDB_FLAG_NEXT_MODULE |
1831                                             DSDB_SEARCH_SHOW_RECYCLED |
1832                                             DSDB_SEARCH_SHOW_EXTENDED_DN |
1833                                             DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT |
1834                                             DSDB_SEARCH_REVEAL_INTERNALS, req);
1835                 if (ret != LDB_SUCCESS) {
1836                         return ret;
1837                 }
1838
1839                 omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
1840                 if (!omd_value) {
1841                         DEBUG(0,(__location__ ": Object %s does not have a replPropertyMetaData attribute\n",
1842                                  ldb_dn_get_linearized(msg->dn)));
1843                         return LDB_ERR_OPERATIONS_ERROR;
1844                 }
1845
1846                 ndr_err = ndr_pull_struct_blob(omd_value, msg, &omd,
1847                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
1848                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1849                         DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
1850                                  ldb_dn_get_linearized(msg->dn)));
1851                         return LDB_ERR_OPERATIONS_ERROR;
1852                 }
1853
1854                 if (omd.version != 1) {
1855                         DEBUG(0,(__location__ ": bad version %u in replPropertyMetaData for %s\n",
1856                                  omd.version, ldb_dn_get_linearized(msg->dn)));
1857                         return LDB_ERR_OPERATIONS_ERROR;
1858                 }
1859
1860                 for (i=0; i<msg->num_elements;) {
1861                         struct ldb_message_element *el = &msg->elements[i];
1862                         struct ldb_message_element *old_el;
1863
1864                         old_el = ldb_msg_find_element(res->msgs[0], el->name);
1865                         ret = replmd_update_rpmd_element(ldb, msg, el, old_el,
1866                                                          &omd, schema, seq_num,
1867                                                          our_invocation_id,
1868                                                          now, is_schema_nc,
1869                                                          is_forced_rodc,
1870                                                          req);
1871                         if (ret != LDB_SUCCESS) {
1872                                 return ret;
1873                         }
1874
1875                         if (!*is_urgent && (situation == REPL_URGENT_ON_UPDATE)) {
1876                                 *is_urgent = replmd_check_urgent_attribute(el);
1877                         }
1878
1879                         if (!(el->flags & DSDB_FLAG_INTERNAL_FORCE_META_DATA)) {
1880                                 i++;
1881                                 continue;
1882                         }
1883
1884                         el->flags &= ~DSDB_FLAG_INTERNAL_FORCE_META_DATA;
1885
1886                         if (el->num_values != 0) {
1887                                 i++;
1888                                 continue;
1889                         }
1890
1891                         ldb_msg_remove_element(msg, el);
1892                 }
1893         }
1894
1895         /*
1896          * Assert that we have an objectClass attribute - this is major
1897          * corruption if we don't have this!
1898          */
1899         objectclass_el = ldb_msg_find_element(res->msgs[0], "objectClass");
1900         if (objectclass_el != NULL) {
1901                 /*
1902                  * Now check if this objectClass means we need to do urgent replication
1903                  */
1904                 if (!*is_urgent && replmd_check_urgent_objectclass(objectclass_el,
1905                                                                    situation)) {
1906                         *is_urgent = true;
1907                 }
1908         } else if (!ldb_request_get_control(req, DSDB_CONTROL_DBCHECK)) {
1909                 ldb_asprintf_errstring(ldb, __location__
1910                                        ": objectClass missing on %s\n",
1911                                        ldb_dn_get_linearized(msg->dn));
1912                 return LDB_ERR_OBJECT_CLASS_VIOLATION;
1913         }
1914
1915         /*
1916          * replmd_update_rpmd_element has done an update if the
1917          * seq_num is set
1918          */
1919         if (*seq_num != 0 || rmd_is_just_resorted == true) {
1920                 struct ldb_val *md_value;
1921                 struct ldb_message_element *el;
1922
1923                 /*if we are RODC and this is a DRSR update then its ok*/
1924                 if (!ldb_request_get_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID)
1925                     && !ldb_request_get_control(req, DSDB_CONTROL_DBCHECK_MODIFY_RO_REPLICA)
1926                     && !is_forced_rodc) {
1927                         unsigned instanceType;
1928
1929                         if (*rodc) {
1930                                 ldb_set_errstring(ldb, "RODC modify is forbidden!");
1931                                 return LDB_ERR_REFERRAL;
1932                         }
1933
1934                         instanceType = ldb_msg_find_attr_as_uint(res->msgs[0], "instanceType", INSTANCE_TYPE_WRITE);
1935                         if (!(instanceType & INSTANCE_TYPE_WRITE)) {
1936                                 return ldb_error(ldb, LDB_ERR_UNWILLING_TO_PERFORM,
1937                                                  "cannot change replicated attribute on partial replica");
1938                         }
1939                 }
1940
1941                 md_value = talloc(msg, struct ldb_val);
1942                 if (md_value == NULL) {
1943                         ldb_oom(ldb);
1944                         return LDB_ERR_OPERATIONS_ERROR;
1945                 }
1946
1947                 ret = replmd_replPropertyMetaDataCtr1_sort_and_verify(ldb, &omd.ctr.ctr1, msg->dn);
1948                 if (ret != LDB_SUCCESS) {
1949                         ldb_asprintf_errstring(ldb, "%s: %s", __func__, ldb_errstring(ldb));
1950                         return ret;
1951                 }
1952
1953                 ndr_err = ndr_push_struct_blob(md_value, msg, &omd,
1954                                                (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1955                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1956                         DEBUG(0,(__location__ ": Failed to marshall replPropertyMetaData for %s\n",
1957                                  ldb_dn_get_linearized(msg->dn)));
1958                         return LDB_ERR_OPERATIONS_ERROR;
1959                 }
1960
1961                 ret = ldb_msg_add_empty(msg, "replPropertyMetaData", LDB_FLAG_MOD_REPLACE, &el);
1962                 if (ret != LDB_SUCCESS) {
1963                         DEBUG(0,(__location__ ": Failed to add updated replPropertyMetaData %s\n",
1964                                  ldb_dn_get_linearized(msg->dn)));
1965                         return ret;
1966                 }
1967
1968                 el->num_values = 1;
1969                 el->values = md_value;
1970         }
1971
1972         return LDB_SUCCESS;
1973 }
1974
1975 static int parsed_dn_compare(struct parsed_dn *pdn1, struct parsed_dn *pdn2)
1976 {
1977         int ret = ndr_guid_compare(&pdn1->guid, &pdn2->guid);
1978         if (ret == 0) {
1979                 return data_blob_cmp(&pdn1->dsdb_dn->extra_part,
1980                                      &pdn2->dsdb_dn->extra_part);
1981         }
1982         return ret;
1983 }
1984
1985 /*
1986   get a series of message element values as an array of DNs and GUIDs
1987   the result is sorted by GUID
1988  */
1989 static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
1990                           struct ldb_message_element *el, struct parsed_dn **pdn,
1991                           const char *ldap_oid, struct ldb_request *parent)
1992 {
1993         unsigned int i;
1994         bool values_are_sorted = true;
1995         struct ldb_context *ldb = ldb_module_get_ctx(module);
1996
1997         if (el == NULL) {
1998                 *pdn = NULL;
1999                 return LDB_SUCCESS;
2000         }
2001
2002         (*pdn) = talloc_array(mem_ctx, struct parsed_dn, el->num_values);
2003         if (!*pdn) {
2004                 ldb_module_oom(module);
2005                 return LDB_ERR_OPERATIONS_ERROR;
2006         }
2007
2008         for (i=0; i<el->num_values; i++) {
2009                 struct ldb_val *v = &el->values[i];
2010                 NTSTATUS status;
2011                 struct ldb_dn *dn;
2012                 struct parsed_dn *p;
2013
2014                 p = &(*pdn)[i];
2015
2016                 p->dsdb_dn = dsdb_dn_parse(*pdn, ldb, v, ldap_oid);
2017                 if (p->dsdb_dn == NULL) {
2018                         return LDB_ERR_INVALID_DN_SYNTAX;
2019                 }
2020
2021                 dn = p->dsdb_dn->dn;
2022
2023                 status = dsdb_get_extended_dn_guid(dn, &p->guid, "GUID");
2024                 if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) ||
2025                     unlikely(GUID_all_zero(&p->guid))) {
2026                         /* we got a DN without a GUID - go find the GUID */
2027                         int ret = dsdb_module_guid_by_dn(module, dn, &p->guid, parent);
2028                         if (ret != LDB_SUCCESS) {
2029                                 ldb_asprintf_errstring(ldb, "Unable to find GUID for DN %s\n",
2030                                                        ldb_dn_get_linearized(dn));
2031                                 if (ret == LDB_ERR_NO_SUCH_OBJECT &&
2032                                     LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_DELETE &&
2033                                     ldb_attr_cmp(el->name, "member") == 0) {
2034                                         return LDB_ERR_UNWILLING_TO_PERFORM;
2035                                 }
2036                                 return ret;
2037                         }
2038                         ret = dsdb_set_extended_dn_guid(dn, &p->guid, "GUID");
2039                         if (ret != LDB_SUCCESS) {
2040                                 return ret;
2041                         }
2042                 } else if (!NT_STATUS_IS_OK(status)) {
2043                         return LDB_ERR_OPERATIONS_ERROR;
2044                 }
2045                 if (i > 0 && values_are_sorted) {
2046                         int cmp = parsed_dn_compare(p, &(*pdn)[i - 1]);
2047                         if (cmp < 0) {
2048                                 values_are_sorted = false;
2049                         }
2050                 }
2051                 /* keep a pointer to the original ldb_val */
2052                 p->v = v;
2053         }
2054         if (! values_are_sorted) {
2055                 TYPESAFE_QSORT(*pdn, el->num_values, parsed_dn_compare);
2056         }
2057         return LDB_SUCCESS;
2058 }
2059
2060 /*
2061  * Get a series of trusted message element values. The result is sorted by
2062  * GUID, even though the GUIDs might not be known. That works because we trust
2063  * the database to give us the elements like that if the
2064  * replmd_private->sorted_links flag is set.
2065  *
2066  * We also ensure that the links are in the Functional Level 2003
2067  * linked attributes format.
2068  */
2069 static int get_parsed_dns_trusted(struct ldb_module *module,
2070                                   struct replmd_private *replmd_private,
2071                                   TALLOC_CTX *mem_ctx,
2072                                   struct ldb_message_element *el,
2073                                   struct parsed_dn **pdn,
2074                                   const char *ldap_oid,
2075                                   struct ldb_request *parent)
2076 {
2077         unsigned int i;
2078         int ret;
2079         if (el == NULL) {
2080                 *pdn = NULL;
2081                 return LDB_SUCCESS;
2082         }
2083
2084         if (!replmd_private->sorted_links) {
2085                 /* We need to sort the list. This is the slow old path we want
2086                    to avoid.
2087                  */
2088                 ret = get_parsed_dns(module, mem_ctx, el, pdn, ldap_oid,
2089                                       parent);
2090                 if (ret != LDB_SUCCESS) {
2091                         return ret;
2092                 }
2093         } else {
2094                 /* Here we get a list of 'struct parsed_dns' without the parsing */
2095                 *pdn = talloc_zero_array(mem_ctx, struct parsed_dn,
2096                                          el->num_values);
2097                 if (!*pdn) {
2098                         ldb_module_oom(module);
2099                         return LDB_ERR_OPERATIONS_ERROR;
2100                 }
2101
2102                 for (i = 0; i < el->num_values; i++) {
2103                         (*pdn)[i].v = &el->values[i];
2104                 }
2105         }
2106
2107         /*
2108          * This upgrades links to FL2003 style, and sorts the result
2109          * if that was needed.
2110          *
2111          * TODO: Add a database feature that asserts we have no FL2000
2112          *       style links to avoid this check or add a feature that
2113          *       uses a similar check to find sorted/unsorted links
2114          *       for an on-the-fly upgrade.
2115          */
2116
2117         ret = replmd_check_upgrade_links(ldb_module_get_ctx(module),
2118                                          *pdn, el->num_values,
2119                                          el,
2120                                          ldap_oid);
2121         if (ret != LDB_SUCCESS) {
2122                 return ret;
2123         }
2124
2125         return LDB_SUCCESS;
2126 }
2127
2128 /*
2129   build a new extended DN, including all meta data fields
2130
2131   RMD_FLAGS           = DSDB_RMD_FLAG_* bits
2132   RMD_ADDTIME         = originating_add_time
2133   RMD_INVOCID         = originating_invocation_id
2134   RMD_CHANGETIME      = originating_change_time
2135   RMD_ORIGINATING_USN = originating_usn
2136   RMD_LOCAL_USN       = local_usn
2137   RMD_VERSION         = version
2138  */
2139 static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
2140                                const struct GUID *invocation_id, uint64_t seq_num,
2141                                uint64_t local_usn, NTTIME nttime, uint32_t version, bool deleted)
2142 {
2143         struct ldb_dn *dn = dsdb_dn->dn;
2144         const char *tstring, *usn_string, *flags_string;
2145         struct ldb_val tval;
2146         struct ldb_val iid;
2147         struct ldb_val usnv, local_usnv;
2148         struct ldb_val vers, flagsv;
2149         NTSTATUS status;
2150         int ret;
2151         const char *dnstring;
2152         char *vstring;
2153         uint32_t rmd_flags = deleted?DSDB_RMD_FLAG_DELETED:0;
2154
2155         tstring = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)nttime);
2156         if (!tstring) {
2157                 return LDB_ERR_OPERATIONS_ERROR;
2158         }
2159         tval = data_blob_string_const(tstring);
2160
2161         usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)seq_num);
2162         if (!usn_string) {
2163                 return LDB_ERR_OPERATIONS_ERROR;
2164         }
2165         usnv = data_blob_string_const(usn_string);
2166
2167         usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)local_usn);
2168         if (!usn_string) {
2169                 return LDB_ERR_OPERATIONS_ERROR;
2170         }
2171         local_usnv = data_blob_string_const(usn_string);
2172
2173         vstring = talloc_asprintf(mem_ctx, "%lu", (unsigned long)version);
2174         if (!vstring) {
2175                 return LDB_ERR_OPERATIONS_ERROR;
2176         }
2177         vers = data_blob_string_const(vstring);
2178
2179         status = GUID_to_ndr_blob(invocation_id, dn, &iid);
2180         if (!NT_STATUS_IS_OK(status)) {
2181                 return LDB_ERR_OPERATIONS_ERROR;
2182         }
2183
2184         flags_string = talloc_asprintf(mem_ctx, "%u", rmd_flags);
2185         if (!flags_string) {
2186                 return LDB_ERR_OPERATIONS_ERROR;
2187         }
2188         flagsv = data_blob_string_const(flags_string);
2189
2190         ret = ldb_dn_set_extended_component(dn, "RMD_FLAGS", &flagsv);
2191         if (ret != LDB_SUCCESS) return ret;
2192         ret = ldb_dn_set_extended_component(dn, "RMD_ADDTIME", &tval);
2193         if (ret != LDB_SUCCESS) return ret;
2194         ret = ldb_dn_set_extended_component(dn, "RMD_INVOCID", &iid);
2195         if (ret != LDB_SUCCESS) return ret;
2196         ret = ldb_dn_set_extended_component(dn, "RMD_CHANGETIME", &tval);
2197         if (ret != LDB_SUCCESS) return ret;
2198         ret = ldb_dn_set_extended_component(dn, "RMD_LOCAL_USN", &local_usnv);
2199         if (ret != LDB_SUCCESS) return ret;
2200         ret = ldb_dn_set_extended_component(dn, "RMD_ORIGINATING_USN", &usnv);
2201         if (ret != LDB_SUCCESS) return ret;
2202         ret = ldb_dn_set_extended_component(dn, "RMD_VERSION", &vers);
2203         if (ret != LDB_SUCCESS) return ret;
2204
2205         dnstring = dsdb_dn_get_extended_linearized(mem_ctx, dsdb_dn, 1);
2206         if (dnstring == NULL) {
2207                 return LDB_ERR_OPERATIONS_ERROR;
2208         }
2209         *v = data_blob_string_const(dnstring);
2210
2211         return LDB_SUCCESS;
2212 }
2213
2214 static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
2215                                 struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
2216                                 uint64_t seq_num, uint64_t local_usn, NTTIME nttime,
2217                                 uint32_t version, bool deleted);
2218
2219 /*
2220   check if any links need upgrading from w2k format
2221  */
2222 static int replmd_check_upgrade_links(struct ldb_context *ldb,
2223                                       struct parsed_dn *dns, uint32_t count,
2224                                       struct ldb_message_element *el,
2225                                       const char *ldap_oid)
2226 {
2227         uint32_t i;
2228         const struct GUID *invocation_id = NULL;
2229         for (i=0; i<count; i++) {
2230                 NTSTATUS status;
2231                 uint32_t version;
2232                 int ret;
2233                 if (dns[i].dsdb_dn == NULL) {
2234                         ret = really_parse_trusted_dn(dns, ldb, &dns[i],
2235                                                       ldap_oid);
2236                         if (ret != LDB_SUCCESS) {
2237                                 return LDB_ERR_INVALID_DN_SYNTAX;
2238                         }
2239                 }
2240
2241                 status = dsdb_get_extended_dn_uint32(dns[i].dsdb_dn->dn,
2242                                                      &version, "RMD_VERSION");
2243                 if (!NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
2244                         /*
2245                          *  We optimistically assume they are all the same; if
2246                          *  the first one is fixed, they are all fixed.
2247                          *
2248                          *  If the first one was *not* fixed and we find a
2249                          *  later one that is, that is an occasion to shout
2250                          *  with DEBUG(0).
2251                          */
2252                         if (i == 0) {
2253                                 return LDB_SUCCESS;
2254                         }
2255                         DEBUG(0, ("Mixed w2k and fixed format "
2256                                   "linked attributes\n"));
2257                         continue;
2258                 }
2259
2260                 if (invocation_id == NULL) {
2261                         invocation_id = samdb_ntds_invocation_id(ldb);
2262                         if (invocation_id == NULL) {
2263                                 return LDB_ERR_OPERATIONS_ERROR;
2264                         }
2265                 }
2266
2267
2268                 /* it's an old one that needs upgrading */
2269                 ret = replmd_update_la_val(el->values, dns[i].v,
2270                                            dns[i].dsdb_dn, dns[i].dsdb_dn,
2271                                            invocation_id, 1, 1, 0, 0, false);
2272                 if (ret != LDB_SUCCESS) {
2273                         return ret;
2274                 }
2275         }
2276
2277         /*
2278          * This sort() is critical for the operation of
2279          * get_parsed_dns_trusted() because callers of this function
2280          * expect a sorted list, and FL2000 style links are not
2281          * sorted.  In particular, as well as the upgrade case,
2282          * get_parsed_dns_trusted() is called from
2283          * replmd_delete_remove_link() even in FL2000 mode
2284          *
2285          * We do not normally pay the cost of the qsort() due to the
2286          * early return in the RMD_VERSION found case.
2287          */
2288         TYPESAFE_QSORT(dns, count, parsed_dn_compare);
2289         return LDB_SUCCESS;
2290 }
2291
2292 /*
2293   update an extended DN, including all meta data fields
2294
2295   see replmd_build_la_val for value names
2296  */
2297 static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
2298                                 struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
2299                                 uint64_t usn, uint64_t local_usn, NTTIME nttime,
2300                                 uint32_t version, bool deleted)
2301 {
2302         struct ldb_dn *dn = dsdb_dn->dn;
2303         const char *tstring, *usn_string, *flags_string;
2304         struct ldb_val tval;
2305         struct ldb_val iid;
2306         struct ldb_val usnv, local_usnv;
2307         struct ldb_val vers, flagsv;
2308         const struct ldb_val *old_addtime;
2309         uint32_t old_version;
2310         NTSTATUS status;
2311         int ret;
2312         const char *dnstring;
2313         char *vstring;
2314         uint32_t rmd_flags = deleted?DSDB_RMD_FLAG_DELETED:0;
2315
2316         tstring = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)nttime);
2317         if (!tstring) {
2318                 return LDB_ERR_OPERATIONS_ERROR;
2319         }
2320         tval = data_blob_string_const(tstring);
2321
2322         usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)usn);
2323         if (!usn_string) {
2324                 return LDB_ERR_OPERATIONS_ERROR;
2325         }
2326         usnv = data_blob_string_const(usn_string);
2327
2328         usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)local_usn);
2329         if (!usn_string) {
2330                 return LDB_ERR_OPERATIONS_ERROR;
2331         }
2332         local_usnv = data_blob_string_const(usn_string);
2333
2334         status = GUID_to_ndr_blob(invocation_id, dn, &iid);
2335         if (!NT_STATUS_IS_OK(status)) {
2336                 return LDB_ERR_OPERATIONS_ERROR;
2337         }
2338
2339         flags_string = talloc_asprintf(mem_ctx, "%u", rmd_flags);
2340         if (!flags_string) {
2341                 return LDB_ERR_OPERATIONS_ERROR;
2342         }
2343         flagsv = data_blob_string_const(flags_string);
2344
2345         ret = ldb_dn_set_extended_component(dn, "RMD_FLAGS", &flagsv);
2346         if (ret != LDB_SUCCESS) return ret;
2347
2348         /* get the ADDTIME from the original */
2349         old_addtime = ldb_dn_get_extended_component(old_dsdb_dn->dn, "RMD_ADDTIME");
2350         if (old_addtime == NULL) {
2351                 old_addtime = &tval;
2352         }
2353         if (dsdb_dn != old_dsdb_dn ||
2354             ldb_dn_get_extended_component(dn, "RMD_ADDTIME") == NULL) {
2355                 ret = ldb_dn_set_extended_component(dn, "RMD_ADDTIME", old_addtime);
2356                 if (ret != LDB_SUCCESS) return ret;
2357         }
2358
2359         /* use our invocation id */
2360         ret = ldb_dn_set_extended_component(dn, "RMD_INVOCID", &iid);
2361         if (ret != LDB_SUCCESS) return ret;
2362
2363         /* changetime is the current time */
2364         ret = ldb_dn_set_extended_component(dn, "RMD_CHANGETIME", &tval);
2365         if (ret != LDB_SUCCESS) return ret;
2366
2367         /* update the USN */
2368         ret = ldb_dn_set_extended_component(dn, "RMD_ORIGINATING_USN", &usnv);
2369         if (ret != LDB_SUCCESS) return ret;
2370
2371         ret = ldb_dn_set_extended_component(dn, "RMD_LOCAL_USN", &local_usnv);
2372         if (ret != LDB_SUCCESS) return ret;
2373
2374         /* increase the version by 1 */
2375         status = dsdb_get_extended_dn_uint32(old_dsdb_dn->dn, &old_version, "RMD_VERSION");
2376         if (NT_STATUS_IS_OK(status) && old_version >= version) {
2377                 version = old_version+1;
2378         }
2379         vstring = talloc_asprintf(dn, "%lu", (unsigned long)version);
2380         vers = data_blob_string_const(vstring);
2381         ret = ldb_dn_set_extended_component(dn, "RMD_VERSION", &vers);
2382         if (ret != LDB_SUCCESS) return ret;
2383
2384         dnstring = dsdb_dn_get_extended_linearized(mem_ctx, dsdb_dn, 1);
2385         if (dnstring == NULL) {
2386                 return LDB_ERR_OPERATIONS_ERROR;
2387         }
2388         *v = data_blob_string_const(dnstring);
2389
2390         return LDB_SUCCESS;
2391 }
2392
2393 /*
2394   handle adding a linked attribute
2395  */
2396 static int replmd_modify_la_add(struct ldb_module *module,
2397                                 struct replmd_private *replmd_private,
2398                                 const struct dsdb_schema *schema,
2399                                 struct ldb_message *msg,
2400                                 struct ldb_message_element *el,
2401                                 struct ldb_message_element *old_el,
2402                                 const struct dsdb_attribute *schema_attr,
2403                                 uint64_t seq_num,
2404                                 time_t t,
2405                                 struct ldb_dn *msg_dn,
2406                                 struct ldb_request *parent)
2407 {
2408         unsigned int i, j;
2409         struct parsed_dn *dns, *old_dns;
2410         TALLOC_CTX *tmp_ctx = talloc_new(msg);
2411         int ret;
2412         struct ldb_val *new_values = NULL;
2413         unsigned old_num_values = old_el ? old_el->num_values : 0;
2414         unsigned num_values = 0;
2415         unsigned max_num_values;
2416         const struct GUID *invocation_id;
2417         struct ldb_context *ldb = ldb_module_get_ctx(module);
2418         NTTIME now;
2419         unix_to_nt_time(&now, t);
2420
2421         invocation_id = samdb_ntds_invocation_id(ldb);
2422         if (!invocation_id) {
2423                 talloc_free(tmp_ctx);
2424                 return LDB_ERR_OPERATIONS_ERROR;
2425         }
2426
2427         /* get the DNs to be added, fully parsed.
2428          *
2429          * We need full parsing because they came off the wire and we don't
2430          * trust them, besides which we need their details to know where to put
2431          * them.
2432          */
2433         ret = get_parsed_dns(module, tmp_ctx, el, &dns,
2434                              schema_attr->syntax->ldap_oid, parent);
2435         if (ret != LDB_SUCCESS) {
2436                 talloc_free(tmp_ctx);
2437                 return ret;
2438         }
2439
2440         /* get the existing DNs, lazily parsed */
2441         ret = get_parsed_dns_trusted(module, replmd_private,
2442                                      tmp_ctx, old_el, &old_dns,
2443                                      schema_attr->syntax->ldap_oid, parent);
2444
2445         if (ret != LDB_SUCCESS) {
2446                 talloc_free(tmp_ctx);
2447                 return ret;
2448         }
2449
2450         max_num_values = old_num_values + el->num_values;
2451         if (max_num_values < old_num_values) {
2452                 DEBUG(0, ("we seem to have overflow in replmd_modify_la_add. "
2453                           "old values: %u, new values: %u, sum: %u",
2454                           old_num_values, el->num_values, max_num_values));
2455                 talloc_free(tmp_ctx);
2456                 return LDB_ERR_OPERATIONS_ERROR;
2457         }
2458
2459         new_values = talloc_zero_array(tmp_ctx, struct ldb_val, max_num_values);
2460
2461         if (new_values == NULL) {
2462                 ldb_module_oom(module);
2463                 talloc_free(tmp_ctx);
2464                 return LDB_ERR_OPERATIONS_ERROR;
2465         }
2466
2467         /*
2468          * For each new value, find where it would go in the list. If there is
2469          * a matching GUID there, we update the existing value; otherwise we
2470          * put it in place.
2471          */
2472         j = 0;
2473         for (i = 0; i < el->num_values; i++) {
2474                 struct parsed_dn *exact;
2475                 struct parsed_dn *next;
2476                 unsigned offset;
2477                 int err = parsed_dn_find(ldb, old_dns, old_num_values,
2478                                          &dns[i].guid,
2479                                          dns[i].dsdb_dn->dn,
2480                                          dns[i].dsdb_dn->extra_part, 0,
2481                                          &exact, &next,
2482                                          schema_attr->syntax->ldap_oid,
2483                                          true);
2484                 if (err != LDB_SUCCESS) {
2485                         talloc_free(tmp_ctx);
2486                         return err;
2487                 }
2488
2489                 if (exact != NULL) {
2490                         /*
2491                          * We are trying to add one that exists, which is only
2492                          * allowed if it was previously deleted.
2493                          *
2494                          * When we do undelete a link we change it in place.
2495                          * It will be copied across into the right spot in due
2496                          * course.
2497                          */
2498                         uint32_t rmd_flags;
2499                         rmd_flags = dsdb_dn_rmd_flags(exact->dsdb_dn->dn);
2500
2501                         if (!(rmd_flags & DSDB_RMD_FLAG_DELETED)) {
2502                                 struct GUID_txt_buf guid_str;
2503                                 ldb_asprintf_errstring(ldb,
2504                                                        "Attribute %s already "
2505                                                        "exists for target GUID %s",
2506                                                        el->name,
2507                                                        GUID_buf_string(&exact->guid,
2508                                                                        &guid_str));
2509                                 talloc_free(tmp_ctx);
2510                                 /* error codes for 'member' need to be
2511                                    special cased */
2512                                 if (ldb_attr_cmp(el->name, "member") == 0) {
2513                                         return LDB_ERR_ENTRY_ALREADY_EXISTS;
2514                                 } else {
2515                                         return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
2516                                 }
2517                         }
2518
2519                         ret = replmd_update_la_val(new_values, exact->v,
2520                                                    dns[i].dsdb_dn,
2521                                                    exact->dsdb_dn,
2522                                                    invocation_id, seq_num,
2523                                                    seq_num, now, 0, false);
2524                         if (ret != LDB_SUCCESS) {
2525                                 talloc_free(tmp_ctx);
2526                                 return ret;
2527                         }
2528
2529                         ret = replmd_add_backlink(module, replmd_private,
2530                                                   schema,
2531                                                   msg_dn,
2532                                                   &dns[i].guid, 
2533                                                   true,
2534                                                   schema_attr,
2535                                                   parent);
2536                         if (ret != LDB_SUCCESS) {
2537                                 talloc_free(tmp_ctx);
2538                                 return ret;
2539                                 }
2540                         continue;
2541                 }
2542                 /*
2543                  * Here we don't have an exact match.
2544                  *
2545                  * If next is NULL, this one goes beyond the end of the
2546                  * existing list, so we need to add all of those ones first.
2547                  *
2548                  * If next is not NULL, we need to add all the ones before
2549                  * next.
2550                  */
2551                 if (next == NULL) {
2552                         offset = old_num_values;
2553                 } else {
2554                         /* next should have been parsed, but let's make sure */
2555                         if (next->dsdb_dn == NULL) {
2556                                 ret = really_parse_trusted_dn(tmp_ctx, ldb, next,
2557                                                               schema_attr->syntax->ldap_oid);
2558                                 if (ret != LDB_SUCCESS) {
2559                                         return ret;
2560                                 }
2561                         }
2562                         offset = MIN(next - old_dns, old_num_values);
2563                 }
2564
2565                 /* put all the old ones before next on the list */
2566                 for (; j < offset; j++) {
2567                         new_values[num_values] = *old_dns[j].v;
2568                         num_values++;
2569                 }
2570
2571                 ret = replmd_add_backlink(module, replmd_private,
2572                                           schema, msg_dn,
2573                                           &dns[i].guid,
2574                                           true, schema_attr,
2575                                           parent);
2576                 /* Make the new linked attribute ldb_val. */
2577                 ret = replmd_build_la_val(new_values, &new_values[num_values],
2578                                           dns[i].dsdb_dn, invocation_id,
2579                                           seq_num, seq_num,
2580                                           now, 0, false);
2581                 if (ret != LDB_SUCCESS) {
2582                         talloc_free(tmp_ctx);
2583                         return ret;
2584                 }
2585                 num_values++;
2586                 if (ret != LDB_SUCCESS) {
2587                         talloc_free(tmp_ctx);
2588                         return ret;
2589                 }
2590         }
2591         /* copy the rest of the old ones (if any) */
2592         for (; j < old_num_values; j++) {
2593                 new_values[num_values] = *old_dns[j].v;
2594                 num_values++;
2595         }
2596
2597         talloc_steal(msg->elements, new_values);
2598         if (old_el != NULL) {
2599                 talloc_steal(msg->elements, old_el->values);
2600         }
2601         el->values = new_values;
2602         el->num_values = num_values;
2603
2604         talloc_free(tmp_ctx);
2605
2606         /* we now tell the backend to replace all existing values
2607            with the one we have constructed */
2608         el->flags = LDB_FLAG_MOD_REPLACE;
2609
2610         return LDB_SUCCESS;
2611 }
2612
2613
2614 /*
2615   handle deleting all active linked attributes
2616  */
2617 static int replmd_modify_la_delete(struct ldb_module *module,
2618                                    struct replmd_private *replmd_private,
2619                                    const struct dsdb_schema *schema,
2620                                    struct ldb_message *msg,
2621                                    struct ldb_message_element *el,
2622                                    struct ldb_message_element *old_el,
2623                                    const struct dsdb_attribute *schema_attr,
2624                                    uint64_t seq_num,
2625                                    time_t t,
2626                                    struct ldb_dn *msg_dn,
2627                                    struct ldb_request *parent)
2628 {
2629         unsigned int i;
2630         struct parsed_dn *dns, *old_dns;
2631         TALLOC_CTX *tmp_ctx = NULL;
2632         int ret;
2633         struct ldb_context *ldb = ldb_module_get_ctx(module);
2634         struct ldb_control *vanish_links_ctrl = NULL;
2635         bool vanish_links = false;
2636         unsigned int num_to_delete = el->num_values;
2637         uint32_t rmd_flags;
2638         const struct GUID *invocation_id;
2639         NTTIME now;
2640
2641         unix_to_nt_time(&now, t);
2642
2643         invocation_id = samdb_ntds_invocation_id(ldb);
2644         if (!invocation_id) {
2645                 return LDB_ERR_OPERATIONS_ERROR;
2646         }
2647
2648         if (old_el == NULL || old_el->num_values == 0) {
2649                 /* there is nothing to delete... */
2650                 if (num_to_delete == 0) {
2651                         /* and we're deleting nothing, so that's OK */
2652                         return LDB_SUCCESS;
2653                 }
2654                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
2655         }
2656
2657         tmp_ctx = talloc_new(msg);
2658         if (tmp_ctx == NULL) {
2659                 return LDB_ERR_OPERATIONS_ERROR;
2660         }
2661
2662         ret = get_parsed_dns(module, tmp_ctx, el, &dns,
2663                              schema_attr->syntax->ldap_oid, parent);
2664         if (ret != LDB_SUCCESS) {
2665                 talloc_free(tmp_ctx);
2666                 return ret;
2667         }
2668
2669         ret = get_parsed_dns_trusted(module, replmd_private,
2670                                      tmp_ctx, old_el, &old_dns,
2671                                      schema_attr->syntax->ldap_oid, parent);
2672
2673         if (ret != LDB_SUCCESS) {
2674                 talloc_free(tmp_ctx);
2675                 return ret;
2676         }
2677
2678         if (parent) {
2679                 vanish_links_ctrl = ldb_request_get_control(parent, DSDB_CONTROL_REPLMD_VANISH_LINKS);
2680                 if (vanish_links_ctrl) {
2681                         vanish_links = true;
2682                         vanish_links_ctrl->critical = false;
2683                 }
2684         }
2685
2686         /* we empty out el->values here to avoid damage if we return early. */
2687         el->num_values = 0;
2688         el->values = NULL;
2689
2690         /*
2691          * If vanish links is set, we are actually removing members of
2692          *  old_el->values; otherwise we are just marking them deleted.
2693          *
2694          * There is a special case when no values are given: we remove them
2695          * all. When we have the vanish_links control we just have to remove
2696          * the backlinks and change our element to replace the existing values
2697          * with the empty list.
2698          */
2699
2700         if (num_to_delete == 0) {
2701                 for (i = 0; i < old_el->num_values; i++) {
2702                         struct parsed_dn *p = &old_dns[i];
2703                         if (p->dsdb_dn == NULL) {
2704                                 ret = really_parse_trusted_dn(tmp_ctx, ldb, p,
2705                                                               schema_attr->syntax->ldap_oid);
2706                                 if (ret != LDB_SUCCESS) {
2707                                         return ret;
2708                                 }
2709                         }
2710                         ret = replmd_add_backlink(module, replmd_private,
2711                                                   schema, msg_dn, &p->guid,
2712                                                   false, schema_attr,
2713                                                   parent);
2714                         if (ret != LDB_SUCCESS) {
2715                                 talloc_free(tmp_ctx);
2716                                 return ret;
2717                         }
2718                         if (vanish_l