replmd: Pass old_el into replmd_process_linked_attribute()
[samba.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /*
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005-2013
6    Copyright (C) Andrew Tridgell 2005-2009
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8    Copyright (C) Matthieu Patou <mat@samba.org> 2010-2011
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 3 of the License, or
13    (at your option) any later version.
14
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb repl_meta_data module
28  *
29  *  Description: - add a unique objectGUID onto every new record,
30  *               - handle whenCreated, whenChanged timestamps
31  *               - handle uSNCreated, uSNChanged numbers
32  *               - handle replPropertyMetaData attribute
33  *
34  *  Author: Simo Sorce
35  *  Author: Stefan Metzmacher
36  */
37
38 #include "includes.h"
39 #include "ldb_module.h"
40 #include "dsdb/samdb/samdb.h"
41 #include "dsdb/common/proto.h"
42 #include "dsdb/common/util.h"
43 #include "../libds/common/flags.h"
44 #include "librpc/gen_ndr/irpc.h"
45 #include "librpc/gen_ndr/ndr_misc.h"
46 #include "librpc/gen_ndr/ndr_drsuapi.h"
47 #include "librpc/gen_ndr/ndr_drsblobs.h"
48 #include "param/param.h"
49 #include "libcli/security/security.h"
50 #include "lib/util/dlinklist.h"
51 #include "dsdb/samdb/ldb_modules/util.h"
52 #include "lib/util/tsort.h"
53
54 #undef DBGC_CLASS
55 #define DBGC_CLASS            DBGC_DRS_REPL
56
57 /* the RMD_VERSION for linked attributes starts from 1 */
58 #define RMD_VERSION_INITIAL   1
59
60 /*
61  * It's 29/12/9999 at 23:59:59 UTC as specified in MS-ADTS 7.1.1.4.2
62  * Deleted Objects Container
63  */
64 static const NTTIME DELETED_OBJECT_CONTAINER_CHANGE_TIME = 2650466015990000000ULL;
65
66 struct replmd_private {
67         TALLOC_CTX *la_ctx;
68         struct la_group *la_list;
69         struct nc_entry {
70                 struct nc_entry *prev, *next;
71                 struct ldb_dn *dn;
72                 uint64_t mod_usn;
73                 uint64_t mod_usn_urgent;
74         } *ncs;
75         struct ldb_dn *schema_dn;
76         bool originating_updates;
77         bool sorted_links;
78         uint32_t total_links;
79         uint32_t num_processed;
80         bool recyclebin_enabled;
81         bool recyclebin_state_known;
82 };
83
84 /*
85  * groups link attributes together by source-object and attribute-ID,
86  * to improve processing efficiency (i.e. for 'member' attribute, which
87  * could have 100s or 1000s of links).
88  * Note this grouping is best effort - the same source object could still
89  * correspond to several la_groups (a lot depends on the order DRS sends
90  * the links in). The groups currently don't span replication chunks (which
91  * caps the size to ~1500 links by default).
92  */
93 struct la_group {
94         struct la_group *next, *prev;
95         struct la_entry *la_entries;
96 };
97
98 struct la_entry {
99         struct la_entry *next, *prev;
100         struct drsuapi_DsReplicaLinkedAttribute *la;
101         uint32_t dsdb_repl_flags;
102 };
103
104 struct replmd_replicated_request {
105         struct ldb_module *module;
106         struct ldb_request *req;
107
108         const struct dsdb_schema *schema;
109         struct GUID our_invocation_id;
110
111         /* the controls we pass down */
112         struct ldb_control **controls;
113
114         /*
115          * Backlinks for the replmd_add() case (we want to create
116          * backlinks after creating the user, but before the end of
117          * the ADD request) 
118          */
119         struct la_backlink *la_backlinks;
120
121         /* details for the mode where we apply a bunch of inbound replication meessages */
122         bool apply_mode;
123         uint32_t index_current;
124         struct dsdb_extended_replicated_objects *objs;
125
126         struct ldb_message *search_msg;
127         struct GUID local_parent_guid;
128
129         uint64_t seq_num;
130         bool is_urgent;
131
132         bool isDeleted;
133
134         bool fix_link_sid;
135 };
136
137 /*
138  * the result of replmd_process_linked_attribute(): either there was no change
139  * (update was ignored), a new link was added (either inactive or active), or
140  * an existing link was modified (active/inactive status may have changed).
141  */
142 typedef enum {
143         LINK_CHANGE_NONE,
144         LINK_CHANGE_ADDED,
145         LINK_CHANGE_MODIFIED,
146 } replmd_link_changed;
147
148 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar);
149 static int replmd_delete_internals(struct ldb_module *module, struct ldb_request *req, bool re_delete);
150 static int replmd_check_upgrade_links(struct ldb_context *ldb,
151                                       struct parsed_dn *dns, uint32_t count,
152                                       struct ldb_message_element *el,
153                                       const char *ldap_oid);
154 static int replmd_verify_link_target(struct replmd_replicated_request *ar,
155                                      TALLOC_CTX *mem_ctx,
156                                      struct la_entry *la_entry,
157                                      struct ldb_dn *src_dn,
158                                      const struct dsdb_attribute *attr);
159 static int replmd_get_la_entry_source(struct ldb_module *module,
160                                       struct la_entry *la_entry,
161                                       TALLOC_CTX *mem_ctx,
162                                       const struct dsdb_attribute **ret_attr,
163                                       struct ldb_message **source_msg);
164 static int replmd_set_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
165                              struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
166                              uint64_t usn, uint64_t local_usn, NTTIME nttime,
167                              uint32_t version, bool deleted);
168
169 static int replmd_make_deleted_child_dn(TALLOC_CTX *tmp_ctx,
170                                         struct ldb_context *ldb,
171                                         struct ldb_dn *dn,
172                                         const char *rdn_name,
173                                         const struct ldb_val *rdn_value,
174                                         struct GUID guid);
175
176 enum urgent_situation {
177         REPL_URGENT_ON_CREATE = 1,
178         REPL_URGENT_ON_UPDATE = 2,
179         REPL_URGENT_ON_DELETE = 4
180 };
181
182 enum deletion_state {
183         OBJECT_NOT_DELETED=1,
184         OBJECT_DELETED=2,
185         OBJECT_RECYCLED=3,
186         OBJECT_TOMBSTONE=4,
187         OBJECT_REMOVED=5
188 };
189
190 static bool replmd_recyclebin_enabled(struct ldb_module *module)
191 {
192         bool enabled = false;
193         struct replmd_private *replmd_private =
194                 talloc_get_type_abort(ldb_module_get_private(module),
195                                       struct replmd_private);
196
197         /*
198          * only lookup the recycle-bin state once per replication, then cache
199          * the result. This can save us 1000s of DB searches
200          */
201         if (!replmd_private->recyclebin_state_known) {
202                 int ret = dsdb_recyclebin_enabled(module, &enabled);
203                 if (ret != LDB_SUCCESS) {
204                         return false;
205                 }
206
207                 replmd_private->recyclebin_enabled = enabled;
208                 replmd_private->recyclebin_state_known = true;
209         }
210
211         return replmd_private->recyclebin_enabled;
212 }
213
214 static void replmd_deletion_state(struct ldb_module *module,
215                                   const struct ldb_message *msg,
216                                   enum deletion_state *current_state,
217                                   enum deletion_state *next_state)
218 {
219         bool enabled = false;
220
221         if (msg == NULL) {
222                 *current_state = OBJECT_REMOVED;
223                 if (next_state != NULL) {
224                         *next_state = OBJECT_REMOVED;
225                 }
226                 return;
227         }
228
229         enabled = replmd_recyclebin_enabled(module);
230
231         if (ldb_msg_check_string_attribute(msg, "isDeleted", "TRUE")) {
232                 if (!enabled) {
233                         *current_state = OBJECT_TOMBSTONE;
234                         if (next_state != NULL) {
235                                 *next_state = OBJECT_REMOVED;
236                         }
237                         return;
238                 }
239
240                 if (ldb_msg_check_string_attribute(msg, "isRecycled", "TRUE")) {
241                         *current_state = OBJECT_RECYCLED;
242                         if (next_state != NULL) {
243                                 *next_state = OBJECT_REMOVED;
244                         }
245                         return;
246                 }
247
248                 *current_state = OBJECT_DELETED;
249                 if (next_state != NULL) {
250                         *next_state = OBJECT_RECYCLED;
251                 }
252                 return;
253         }
254
255         *current_state = OBJECT_NOT_DELETED;
256         if (next_state == NULL) {
257                 return;
258         }
259
260         if (enabled) {
261                 *next_state = OBJECT_DELETED;
262         } else {
263                 *next_state = OBJECT_TOMBSTONE;
264         }
265 }
266
267 static const struct {
268         const char *update_name;
269         enum urgent_situation repl_situation;
270 } urgent_objects[] = {
271                 {"nTDSDSA", (REPL_URGENT_ON_CREATE | REPL_URGENT_ON_DELETE)},
272                 {"crossRef", (REPL_URGENT_ON_CREATE | REPL_URGENT_ON_DELETE)},
273                 {"attributeSchema", (REPL_URGENT_ON_CREATE | REPL_URGENT_ON_UPDATE)},
274                 {"classSchema", (REPL_URGENT_ON_CREATE | REPL_URGENT_ON_UPDATE)},
275                 {"secret", (REPL_URGENT_ON_CREATE | REPL_URGENT_ON_UPDATE)},
276                 {"rIDManager", (REPL_URGENT_ON_CREATE | REPL_URGENT_ON_UPDATE)},
277                 {NULL, 0}
278 };
279
280 /* Attributes looked for when updating or deleting, to check for a urgent replication needed */
281 static const char *urgent_attrs[] = {
282                 "lockoutTime",
283                 "pwdLastSet",
284                 "userAccountControl",
285                 NULL
286 };
287
288
289 static bool replmd_check_urgent_objectclass(const struct ldb_message_element *objectclass_el,
290                                         enum urgent_situation situation)
291 {
292         unsigned int i, j;
293         for (i=0; urgent_objects[i].update_name; i++) {
294
295                 if ((situation & urgent_objects[i].repl_situation) == 0) {
296                         continue;
297                 }
298
299                 for (j=0; j<objectclass_el->num_values; j++) {
300                         const struct ldb_val *v = &objectclass_el->values[j];
301                         if (ldb_attr_cmp((const char *)v->data, urgent_objects[i].update_name) == 0) {
302                                 return true;
303                         }
304                 }
305         }
306         return false;
307 }
308
309 static bool replmd_check_urgent_attribute(const struct ldb_message_element *el)
310 {
311         if (ldb_attr_in_list(urgent_attrs, el->name)) {
312                 return true;
313         }
314         return false;
315 }
316
317 static int replmd_replicated_apply_isDeleted(struct replmd_replicated_request *ar);
318
319 /*
320   initialise the module
321   allocate the private structure and build the list
322   of partition DNs for use by replmd_notify()
323  */
324 static int replmd_init(struct ldb_module *module)
325 {
326         struct replmd_private *replmd_private;
327         struct ldb_context *ldb = ldb_module_get_ctx(module);
328         static const char *samba_dsdb_attrs[] = { SAMBA_COMPATIBLE_FEATURES_ATTR, NULL };
329         struct ldb_dn *samba_dsdb_dn;
330         struct ldb_result *res;
331         int ret;
332         TALLOC_CTX *frame = talloc_stackframe();
333         replmd_private = talloc_zero(module, struct replmd_private);
334         if (replmd_private == NULL) {
335                 ldb_oom(ldb);
336                 TALLOC_FREE(frame);
337                 return LDB_ERR_OPERATIONS_ERROR;
338         }
339         ldb_module_set_private(module, replmd_private);
340
341         replmd_private->schema_dn = ldb_get_schema_basedn(ldb);
342
343         samba_dsdb_dn = ldb_dn_new(frame, ldb, "@SAMBA_DSDB");
344         if (!samba_dsdb_dn) {
345                 TALLOC_FREE(frame);
346                 return ldb_oom(ldb);
347         }
348
349         ret = dsdb_module_search_dn(module, frame, &res, samba_dsdb_dn,
350                                     samba_dsdb_attrs, DSDB_FLAG_NEXT_MODULE, NULL);
351         if (ret == LDB_SUCCESS) {
352                 replmd_private->sorted_links
353                         = ldb_msg_check_string_attribute(res->msgs[0],
354                                                          SAMBA_COMPATIBLE_FEATURES_ATTR,
355                                                          SAMBA_SORTED_LINKS_FEATURE);
356         }
357         TALLOC_FREE(frame);
358
359         return ldb_next_init(module);
360 }
361
362 /*
363   cleanup our per-transaction contexts
364  */
365 static void replmd_txn_cleanup(struct replmd_private *replmd_private)
366 {
367         talloc_free(replmd_private->la_ctx);
368         replmd_private->la_list = NULL;
369         replmd_private->la_ctx = NULL;
370         replmd_private->recyclebin_state_known = false;
371 }
372
373
374 struct la_backlink {
375         struct la_backlink *next, *prev;
376         const char *attr_name;
377         struct ldb_dn *forward_dn;
378         struct GUID target_guid;
379         bool active;
380 };
381
382 /*
383   a ldb_modify request operating on modules below the
384   current module
385  */
386 static int linked_attr_modify(struct ldb_module *module,
387                               const struct ldb_message *message,
388                               struct ldb_request *parent)
389 {
390         struct ldb_request *mod_req;
391         int ret;
392         struct ldb_context *ldb = ldb_module_get_ctx(module);
393         TALLOC_CTX *tmp_ctx = talloc_new(module);
394         struct ldb_result *res;
395
396         res = talloc_zero(tmp_ctx, struct ldb_result);
397         if (!res) {
398                 talloc_free(tmp_ctx);
399                 return ldb_oom(ldb_module_get_ctx(module));
400         }
401
402         ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
403                                 message,
404                                 NULL,
405                                 res,
406                                 ldb_modify_default_callback,
407                                 parent);
408         LDB_REQ_SET_LOCATION(mod_req);
409         if (ret != LDB_SUCCESS) {
410                 talloc_free(tmp_ctx);
411                 return ret;
412         }
413
414         ret = ldb_request_add_control(mod_req, DSDB_CONTROL_REPLICATED_UPDATE_OID,
415                                       false, NULL);
416         if (ret != LDB_SUCCESS) {
417                 return ret;
418         }
419
420         /* Run the new request */
421         ret = ldb_next_request(module, mod_req);
422
423         if (ret == LDB_SUCCESS) {
424                 ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
425         }
426
427         talloc_free(tmp_ctx);
428         return ret;
429 }
430
431 /*
432   process a backlinks we accumulated during a transaction, adding and
433   deleting the backlinks from the target objects
434  */
435 static int replmd_process_backlink(struct ldb_module *module, struct la_backlink *bl, struct ldb_request *parent)
436 {
437         struct ldb_dn *target_dn, *source_dn;
438         int ret;
439         struct ldb_context *ldb = ldb_module_get_ctx(module);
440         struct ldb_message *msg;
441         TALLOC_CTX *frame = talloc_stackframe();
442         char *dn_string;
443
444         /*
445           - find DN of target
446           - find DN of source
447           - construct ldb_message
448               - either an add or a delete
449          */
450         ret = dsdb_module_dn_by_guid(module, frame, &bl->target_guid, &target_dn, parent);
451         if (ret != LDB_SUCCESS) {
452                 struct GUID_txt_buf guid_str;
453                 DBG_WARNING("Failed to find target DN for linked attribute with GUID %s\n",
454                             GUID_buf_string(&bl->target_guid, &guid_str));
455                 DBG_WARNING("Please run 'samba-tool dbcheck' to resolve any missing backlinks.\n");
456                 talloc_free(frame);
457                 return LDB_SUCCESS;
458         }
459
460         msg = ldb_msg_new(frame);
461         if (msg == NULL) {
462                 ldb_module_oom(module);
463                 talloc_free(frame);
464                 return LDB_ERR_OPERATIONS_ERROR;
465         }
466
467         source_dn = ldb_dn_copy(frame, bl->forward_dn);
468         if (!source_dn) {
469                 ldb_module_oom(module);
470                 talloc_free(frame);
471                 return LDB_ERR_OPERATIONS_ERROR;
472         } else {
473                 /* Filter down to the attributes we want in the backlink */
474                 const char *accept[] = { "GUID", "SID", NULL };
475                 ldb_dn_extended_filter(source_dn, accept);
476         }
477
478         /* construct a ldb_message for adding/deleting the backlink */
479         msg->dn = target_dn;
480         dn_string = ldb_dn_get_extended_linearized(frame, bl->forward_dn, 1);
481         if (!dn_string) {
482                 ldb_module_oom(module);
483                 talloc_free(frame);
484                 return LDB_ERR_OPERATIONS_ERROR;
485         }
486         ret = ldb_msg_add_steal_string(msg, bl->attr_name, dn_string);
487         if (ret != LDB_SUCCESS) {
488                 talloc_free(frame);
489                 return ret;
490         }
491         msg->elements[0].flags = bl->active?LDB_FLAG_MOD_ADD:LDB_FLAG_MOD_DELETE;
492
493         /* a backlink should never be single valued. Unfortunately the
494            exchange schema has a attribute
495            msExchBridgeheadedLocalConnectorsDNBL which is single
496            valued and a backlink. We need to cope with that by
497            ignoring the single value flag */
498         msg->elements[0].flags |= LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK;
499
500         ret = dsdb_module_modify(module, msg, DSDB_FLAG_NEXT_MODULE, parent);
501         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE && !bl->active) {
502                 /* we allow LDB_ERR_NO_SUCH_ATTRIBUTE as success to
503                    cope with possible corruption where the backlink has
504                    already been removed */
505                 DEBUG(3,("WARNING: backlink from %s already removed from %s - %s\n",
506                          ldb_dn_get_linearized(target_dn),
507                          ldb_dn_get_linearized(source_dn),
508                          ldb_errstring(ldb)));
509                 ret = LDB_SUCCESS;
510         } else if (ret != LDB_SUCCESS) {
511                 ldb_asprintf_errstring(ldb, "Failed to %s backlink from %s to %s - %s",
512                                        bl->active?"add":"remove",
513                                        ldb_dn_get_linearized(source_dn),
514                                        ldb_dn_get_linearized(target_dn),
515                                        ldb_errstring(ldb));
516                 talloc_free(frame);
517                 return ret;
518         }
519         talloc_free(frame);
520         return ret;
521 }
522
523 /*
524   add a backlink to the list of backlinks to add/delete in the prepare
525   commit
526
527   forward_dn is stolen onto the defereed context
528  */
529 static int replmd_defer_add_backlink(struct ldb_module *module,
530                                      struct replmd_private *replmd_private,
531                                      const struct dsdb_schema *schema,
532                                      struct replmd_replicated_request *ac,
533                                      struct ldb_dn *forward_dn,
534                                      struct GUID *target_guid, bool active,
535                                      const struct dsdb_attribute *schema_attr,
536                                      struct ldb_request *parent)
537 {
538         const struct dsdb_attribute *target_attr;
539         struct la_backlink *bl;
540         
541         bl = talloc(ac, struct la_backlink);
542         if (bl == NULL) {
543                 ldb_module_oom(module);
544                 return LDB_ERR_OPERATIONS_ERROR;
545         }
546
547         target_attr = dsdb_attribute_by_linkID(schema, schema_attr->linkID ^ 1);
548         if (!target_attr) {
549                 /*
550                  * windows 2003 has a broken schema where the
551                  * definition of msDS-IsDomainFor is missing (which is
552                  * supposed to be the backlink of the
553                  * msDS-HasDomainNCs attribute
554                  */
555                 return LDB_SUCCESS;
556         }
557
558         bl->attr_name = target_attr->lDAPDisplayName;
559         bl->forward_dn = talloc_steal(bl, forward_dn);
560         bl->target_guid = *target_guid;
561         bl->active = active;
562
563         DLIST_ADD(ac->la_backlinks, bl);
564
565         return LDB_SUCCESS;
566 }
567
568 /*
569   add a backlink to the list of backlinks to add/delete in the prepare
570   commit
571  */
572 static int replmd_add_backlink(struct ldb_module *module,
573                                struct replmd_private *replmd_private,
574                                const struct dsdb_schema *schema,
575                                struct ldb_dn *forward_dn,
576                                struct GUID *target_guid, bool active,
577                                const struct dsdb_attribute *schema_attr,
578                                struct ldb_request *parent)
579 {
580         const struct dsdb_attribute *target_attr;
581         struct la_backlink bl;
582         int ret;
583         
584         target_attr = dsdb_attribute_by_linkID(schema, schema_attr->linkID ^ 1);
585         if (!target_attr) {
586                 /*
587                  * windows 2003 has a broken schema where the
588                  * definition of msDS-IsDomainFor is missing (which is
589                  * supposed to be the backlink of the
590                  * msDS-HasDomainNCs attribute
591                  */
592                 return LDB_SUCCESS;
593         }
594
595         bl.attr_name = target_attr->lDAPDisplayName;
596         bl.forward_dn = forward_dn;
597         bl.target_guid = *target_guid;
598         bl.active = active;
599
600         ret = replmd_process_backlink(module, &bl, parent);
601         return ret;
602 }
603
604
605 /*
606  * Callback for most write operations in this module:
607  *
608  * notify the repl task that a object has changed. The notifies are
609  * gathered up in the replmd_private structure then written to the
610  * @REPLCHANGED object in each partition during the prepare_commit
611  */
612 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
613 {
614         int ret;
615         struct replmd_replicated_request *ac =
616                 talloc_get_type_abort(req->context, struct replmd_replicated_request);
617         struct replmd_private *replmd_private =
618                 talloc_get_type_abort(ldb_module_get_private(ac->module), struct replmd_private);
619         struct nc_entry *modified_partition;
620         struct ldb_control *partition_ctrl;
621         const struct dsdb_control_current_partition *partition;
622
623         struct ldb_control **controls;
624
625         partition_ctrl = ldb_reply_get_control(ares, DSDB_CONTROL_CURRENT_PARTITION_OID);
626
627         controls = ares->controls;
628         if (ldb_request_get_control(ac->req,
629                                     DSDB_CONTROL_CURRENT_PARTITION_OID) == NULL) {
630                 /*
631                  * Remove the current partition control from what we pass up
632                  * the chain if it hasn't been requested manually.
633                  */
634                 controls = ldb_controls_except_specified(ares->controls, ares,
635                                                          partition_ctrl);
636         }
637
638         if (ares->error != LDB_SUCCESS) {
639                 struct GUID_txt_buf guid_txt;
640                 struct ldb_message *msg = NULL;
641                 char *s = NULL;
642
643                 if (ac->apply_mode == false) {
644                         DBG_NOTICE("Originating update failure. Error is: %s\n",
645                                    ldb_strerror(ares->error));
646                         return ldb_module_done(ac->req, controls,
647                                                ares->response, ares->error);
648                 }
649
650                 msg = ac->objs->objects[ac->index_current].msg;
651                 /*
652                  * Set at DBG_NOTICE as once these start to happe, they
653                  * will happen a lot until resolved, due to repeated
654                  * replication.  The caller will probably print the
655                  * ldb error string anyway.
656                  */
657                 DBG_NOTICE("DRS replication apply failure for %s. Error is: %s\n",
658                            ldb_dn_get_linearized(msg->dn),
659                            ldb_strerror(ares->error));
660
661                 s = ldb_ldif_message_redacted_string(ldb_module_get_ctx(ac->module),
662                                                      ac,
663                                                      LDB_CHANGETYPE_ADD,
664                                                      msg);
665
666                 DBG_INFO("Failing DRS %s replication message was %s:\n%s\n",
667                          ac->search_msg == NULL ? "ADD" : "MODIFY",
668                          GUID_buf_string(&ac->objs->objects[ac->index_current].object_guid,
669                                          &guid_txt),
670                          s);
671                 talloc_free(s);
672                 return ldb_module_done(ac->req, controls,
673                                        ares->response, ares->error);
674         }
675
676         if (ares->type != LDB_REPLY_DONE) {
677                 ldb_set_errstring(ldb_module_get_ctx(ac->module), "Invalid reply type for notify\n!");
678                 return ldb_module_done(ac->req, NULL,
679                                        NULL, LDB_ERR_OPERATIONS_ERROR);
680         }
681
682         if (ac->apply_mode == false) {
683                 struct la_backlink *bl;
684                 /*
685                  * process our backlink list after an replmd_add(),
686                  * creating and deleting backlinks as necessary (this
687                  * code is sync).  The other cases are handled inline
688                  * with the modify.
689                  */
690                 for (bl=ac->la_backlinks; bl; bl=bl->next) {
691                         ret = replmd_process_backlink(ac->module, bl, ac->req);
692                         if (ret != LDB_SUCCESS) {
693                                 return ldb_module_done(ac->req, NULL,
694                                                        NULL, ret);
695                         }
696                 }
697         }
698         
699         if (!partition_ctrl) {
700                 ldb_set_errstring(ldb_module_get_ctx(ac->module),"No partition control on reply");
701                 return ldb_module_done(ac->req, NULL,
702                                        NULL, LDB_ERR_OPERATIONS_ERROR);
703         }
704
705         partition = talloc_get_type_abort(partition_ctrl->data,
706                                     struct dsdb_control_current_partition);
707
708         if (ac->seq_num > 0) {
709                 for (modified_partition = replmd_private->ncs; modified_partition;
710                      modified_partition = modified_partition->next) {
711                         if (ldb_dn_compare(modified_partition->dn, partition->dn) == 0) {
712                                 break;
713                         }
714                 }
715
716                 if (modified_partition == NULL) {
717                         modified_partition = talloc_zero(replmd_private, struct nc_entry);
718                         if (!modified_partition) {
719                                 ldb_oom(ldb_module_get_ctx(ac->module));
720                                 return ldb_module_done(ac->req, NULL,
721                                                        NULL, LDB_ERR_OPERATIONS_ERROR);
722                         }
723                         modified_partition->dn = ldb_dn_copy(modified_partition, partition->dn);
724                         if (!modified_partition->dn) {
725                                 ldb_oom(ldb_module_get_ctx(ac->module));
726                                 return ldb_module_done(ac->req, NULL,
727                                                        NULL, LDB_ERR_OPERATIONS_ERROR);
728                         }
729                         DLIST_ADD(replmd_private->ncs, modified_partition);
730                 }
731
732                 if (ac->seq_num > modified_partition->mod_usn) {
733                         modified_partition->mod_usn = ac->seq_num;
734                         if (ac->is_urgent) {
735                                 modified_partition->mod_usn_urgent = ac->seq_num;
736                         }
737                 }
738                 if (!ac->apply_mode) {
739                         replmd_private->originating_updates = true;
740                 }
741         }
742
743         if (ac->apply_mode) {
744                 ret = replmd_replicated_apply_isDeleted(ac);
745                 if (ret != LDB_SUCCESS) {
746                         return ldb_module_done(ac->req, NULL, NULL, ret);
747                 }
748                 return ret;
749         } else {
750                 /* free the partition control container here, for the
751                  * common path.  Other cases will have it cleaned up
752                  * eventually with the ares */
753                 talloc_free(partition_ctrl);
754                 return ldb_module_done(ac->req, controls,
755                                        ares->response, LDB_SUCCESS);
756         }
757 }
758
759
760 /*
761  * update a @REPLCHANGED record in each partition if there have been
762  * any writes of replicated data in the partition
763  */
764 static int replmd_notify_store(struct ldb_module *module, struct ldb_request *parent)
765 {
766         struct replmd_private *replmd_private =
767                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
768
769         while (replmd_private->ncs) {
770                 int ret;
771                 struct nc_entry *modified_partition = replmd_private->ncs;
772
773                 ret = dsdb_module_save_partition_usn(module, modified_partition->dn,
774                                                      modified_partition->mod_usn,
775                                                      modified_partition->mod_usn_urgent, parent);
776                 if (ret != LDB_SUCCESS) {
777                         DEBUG(0,(__location__ ": Failed to save partition uSN for %s\n",
778                                  ldb_dn_get_linearized(modified_partition->dn)));
779                         return ret;
780                 }
781
782                 if (ldb_dn_compare(modified_partition->dn,
783                                    replmd_private->schema_dn) == 0) {
784                         struct ldb_result *ext_res;
785                         ret = dsdb_module_extended(module,
786                                                    replmd_private->schema_dn,
787                                                    &ext_res,
788                                                    DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID,
789                                                    ext_res,
790                                                    DSDB_FLAG_NEXT_MODULE,
791                                                    parent);
792                         if (ret != LDB_SUCCESS) {
793                                 return ret;
794                         }
795                         talloc_free(ext_res);
796                 }
797
798                 DLIST_REMOVE(replmd_private->ncs, modified_partition);
799                 talloc_free(modified_partition);
800         }
801
802         return LDB_SUCCESS;
803 }
804
805
806 /*
807   created a replmd_replicated_request context
808  */
809 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
810                                                          struct ldb_request *req)
811 {
812         struct ldb_context *ldb;
813         struct replmd_replicated_request *ac;
814         const struct GUID *our_invocation_id;
815
816         ldb = ldb_module_get_ctx(module);
817
818         ac = talloc_zero(req, struct replmd_replicated_request);
819         if (ac == NULL) {
820                 ldb_oom(ldb);
821                 return NULL;
822         }
823
824         ac->module = module;
825         ac->req = req;
826
827         ac->schema = dsdb_get_schema(ldb, ac);
828         if (!ac->schema) {
829                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
830                               "replmd_modify: no dsdb_schema loaded");
831                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
832                 talloc_free(ac);
833                 return NULL;
834         }
835
836         /* get our invocationId */
837         our_invocation_id = samdb_ntds_invocation_id(ldb);
838         if (!our_invocation_id) {
839                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
840                               "replmd_add: unable to find invocationId\n");
841                 talloc_free(ac);
842                 return NULL;
843         }
844         ac->our_invocation_id = *our_invocation_id;
845
846         return ac;
847 }
848
849 /*
850   add a time element to a record
851 */
852 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
853 {
854         struct ldb_message_element *el;
855         char *s;
856         int ret;
857
858         if (ldb_msg_find_element(msg, attr) != NULL) {
859                 return LDB_SUCCESS;
860         }
861
862         s = ldb_timestring(msg, t);
863         if (s == NULL) {
864                 return LDB_ERR_OPERATIONS_ERROR;
865         }
866
867         ret = ldb_msg_add_string(msg, attr, s);
868         if (ret != LDB_SUCCESS) {
869                 return ret;
870         }
871
872         el = ldb_msg_find_element(msg, attr);
873         /* always set as replace. This works because on add ops, the flag
874            is ignored */
875         el->flags = LDB_FLAG_MOD_REPLACE;
876
877         return LDB_SUCCESS;
878 }
879
880 /*
881   add a uint64_t element to a record
882 */
883 static int add_uint64_element(struct ldb_context *ldb, struct ldb_message *msg,
884                               const char *attr, uint64_t v)
885 {
886         struct ldb_message_element *el;
887         int ret;
888
889         if (ldb_msg_find_element(msg, attr) != NULL) {
890                 return LDB_SUCCESS;
891         }
892
893         ret = samdb_msg_add_uint64(ldb, msg, msg, attr, v);
894         if (ret != LDB_SUCCESS) {
895                 return ret;
896         }
897
898         el = ldb_msg_find_element(msg, attr);
899         /* always set as replace. This works because on add ops, the flag
900            is ignored */
901         el->flags = LDB_FLAG_MOD_REPLACE;
902
903         return LDB_SUCCESS;
904 }
905
906 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
907                                                    const struct replPropertyMetaData1 *m2,
908                                                    const uint32_t *rdn_attid)
909 {
910         /*
911          * This assignment seems inoccous, but it is critical for the
912          * system, as we need to do the comparisons as a unsigned
913          * quantity, not signed (enums are signed integers)
914          */
915         uint32_t attid_1 = m1->attid;
916         uint32_t attid_2 = m2->attid;
917
918         if (attid_1 == attid_2) {
919                 return 0;
920         }
921
922         /*
923          * See above regarding this being an unsigned comparison.
924          * Otherwise when the high bit is set on non-standard
925          * attributes, they would end up first, before objectClass
926          * (0).
927          */
928         return attid_1 > attid_2 ? 1 : -1;
929 }
930
931 static int replmd_replPropertyMetaDataCtr1_verify(struct ldb_context *ldb,
932                                                   struct replPropertyMetaDataCtr1 *ctr1,
933                                                   struct ldb_dn *dn)
934 {
935         if (ctr1->count == 0) {
936                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
937                               "No elements found in replPropertyMetaData for %s!\n",
938                               ldb_dn_get_linearized(dn));
939                 return LDB_ERR_CONSTRAINT_VIOLATION;
940         }
941
942         /* the objectClass attribute is value 0x00000000, so must be first */
943         if (ctr1->array[0].attid != DRSUAPI_ATTID_objectClass) {
944                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
945                               "No objectClass found in replPropertyMetaData for %s!\n",
946                               ldb_dn_get_linearized(dn));
947                 return LDB_ERR_OBJECT_CLASS_VIOLATION;
948         }
949
950         return LDB_SUCCESS;
951 }
952
953 static int replmd_replPropertyMetaDataCtr1_sort_and_verify(struct ldb_context *ldb,
954                                                            struct replPropertyMetaDataCtr1 *ctr1,
955                                                            struct ldb_dn *dn)
956 {
957         /* Note this is O(n^2) for the almost-sorted case, which this is */
958         LDB_TYPESAFE_QSORT(ctr1->array, ctr1->count, NULL,
959                            replmd_replPropertyMetaData1_attid_sort);
960         return replmd_replPropertyMetaDataCtr1_verify(ldb, ctr1, dn);
961 }
962
963 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
964                                                  const struct ldb_message_element *e2,
965                                                  const struct dsdb_schema *schema)
966 {
967         const struct dsdb_attribute *a1;
968         const struct dsdb_attribute *a2;
969
970         /*
971          * TODO: make this faster by caching the dsdb_attribute pointer
972          *       on the ldb_messag_element
973          */
974
975         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
976         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
977
978         /*
979          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
980          *       in the schema
981          */
982         if (!a1 || !a2) {
983                 return strcasecmp(e1->name, e2->name);
984         }
985         if (a1->attributeID_id == a2->attributeID_id) {
986                 return 0;
987         }
988         return a1->attributeID_id > a2->attributeID_id ? 1 : -1;
989 }
990
991 static void replmd_ldb_message_sort(struct ldb_message *msg,
992                                     const struct dsdb_schema *schema)
993 {
994         LDB_TYPESAFE_QSORT(msg->elements, msg->num_elements, schema, replmd_ldb_message_element_attid_sort);
995 }
996
997 static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
998                                const struct GUID *invocation_id,
999                                uint64_t local_usn, NTTIME nttime);
1000
1001 static int parsed_dn_compare(struct parsed_dn *pdn1, struct parsed_dn *pdn2);
1002
1003 static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
1004                           struct ldb_message_element *el, struct parsed_dn **pdn,
1005                           const char *ldap_oid, struct ldb_request *parent);
1006
1007 static int check_parsed_dn_duplicates(struct ldb_module *module,
1008                                       struct ldb_message_element *el,
1009                                       struct parsed_dn *pdn);
1010
1011 /*
1012   fix up linked attributes in replmd_add.
1013   This involves setting up the right meta-data in extended DN
1014   components, and creating backlinks to the object
1015  */
1016 static int replmd_add_fix_la(struct ldb_module *module, TALLOC_CTX *mem_ctx,
1017                              struct replmd_private *replmd_private,
1018                              struct ldb_message_element *el,
1019                              struct replmd_replicated_request *ac,
1020                              NTTIME now,
1021                              struct ldb_dn *forward_dn,
1022                              const struct dsdb_attribute *sa,
1023                              struct ldb_request *parent)
1024 {
1025         unsigned int i;
1026         TALLOC_CTX *tmp_ctx = talloc_new(mem_ctx);
1027         struct ldb_context *ldb = ldb_module_get_ctx(module);
1028         struct parsed_dn *pdn;
1029         /* We will take a reference to the schema in replmd_add_backlink */
1030         const struct dsdb_schema *schema = dsdb_get_schema(ldb, NULL);
1031         struct ldb_val *new_values = NULL;
1032         int ret;
1033
1034         if (dsdb_check_single_valued_link(sa, el) == LDB_SUCCESS) {
1035                 el->flags |= LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK;
1036         } else {
1037                 ldb_asprintf_errstring(ldb,
1038                                        "Attribute %s is single valued but "
1039                                        "more than one value has been supplied",
1040                                        el->name);
1041                 talloc_free(tmp_ctx);
1042                 return LDB_ERR_CONSTRAINT_VIOLATION;
1043         }
1044         
1045         ret = get_parsed_dns(module, tmp_ctx, el, &pdn,
1046                              sa->syntax->ldap_oid, parent);
1047         if (ret != LDB_SUCCESS) {
1048                 talloc_free(tmp_ctx);
1049                 return ret;
1050         }
1051
1052         ret = check_parsed_dn_duplicates(module, el, pdn);
1053         if (ret != LDB_SUCCESS) {
1054                 talloc_free(tmp_ctx);
1055                 return ret;
1056         }
1057
1058         new_values = talloc_array(tmp_ctx, struct ldb_val, el->num_values);
1059         if (new_values == NULL) {
1060                 ldb_module_oom(module);
1061                 talloc_free(tmp_ctx);
1062                 return LDB_ERR_OPERATIONS_ERROR;
1063         }
1064
1065         for (i = 0; i < el->num_values; i++) {
1066                 struct parsed_dn *p = &pdn[i];
1067                 ret = replmd_build_la_val(el->values, p->v, p->dsdb_dn,
1068                                           &ac->our_invocation_id,
1069                                           ac->seq_num, now);
1070                 if (ret != LDB_SUCCESS) {
1071                         talloc_free(tmp_ctx);
1072                         return ret;
1073                 }
1074
1075                 ret = replmd_defer_add_backlink(module, replmd_private,
1076                                                 schema, ac,
1077                                                 forward_dn, &p->guid, true, sa,
1078                                                 parent);
1079                 if (ret != LDB_SUCCESS) {
1080                         talloc_free(tmp_ctx);
1081                         return ret;
1082                 }
1083
1084                 new_values[i] = *p->v;
1085         }
1086         el->values = talloc_steal(mem_ctx, new_values);
1087
1088         talloc_free(tmp_ctx);
1089         return LDB_SUCCESS;
1090 }
1091
1092 static int replmd_add_make_extended_dn(struct ldb_request *req,
1093                                        const DATA_BLOB *guid_blob,
1094                                        struct ldb_dn **_extended_dn)
1095 {
1096         int ret;
1097         const DATA_BLOB *sid_blob;
1098         /* Calculate an extended DN for any linked attributes */
1099         struct ldb_dn *extended_dn = ldb_dn_copy(req, req->op.add.message->dn);
1100         if (!extended_dn) {
1101                 return LDB_ERR_OPERATIONS_ERROR;
1102         }
1103         ret = ldb_dn_set_extended_component(extended_dn, "GUID", guid_blob);
1104         if (ret != LDB_SUCCESS) {
1105                 return ret;
1106         }
1107
1108         sid_blob = ldb_msg_find_ldb_val(req->op.add.message, "objectSID");
1109         if (sid_blob != NULL) {
1110                 ret = ldb_dn_set_extended_component(extended_dn, "SID", sid_blob);
1111                 if (ret != LDB_SUCCESS) {
1112                         return ret;
1113                 }
1114         }
1115         *_extended_dn = extended_dn;
1116         return LDB_SUCCESS;
1117 }
1118
1119 /*
1120   intercept add requests
1121  */
1122 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
1123 {
1124         struct ldb_context *ldb;
1125         struct ldb_control *control;
1126         struct replmd_replicated_request *ac;
1127         enum ndr_err_code ndr_err;
1128         struct ldb_request *down_req;
1129         struct ldb_message *msg;
1130         const DATA_BLOB *guid_blob;
1131         DATA_BLOB guid_blob_stack;
1132         struct GUID guid;
1133         uint8_t guid_data[16];
1134         struct replPropertyMetaDataBlob nmd;
1135         struct ldb_val nmd_value;
1136         struct ldb_dn *extended_dn = NULL;
1137         
1138         /*
1139          * The use of a time_t here seems odd, but as the NTTIME
1140          * elements are actually declared as NTTIME_1sec in the IDL,
1141          * getting a higher resolution timestamp is not required.
1142          */
1143         time_t t = time(NULL);
1144         NTTIME now;
1145         char *time_str;
1146         int ret;
1147         unsigned int i;
1148         unsigned int functional_level;
1149         uint32_t ni=0;
1150         bool allow_add_guid = false;
1151         bool remove_current_guid = false;
1152         bool is_urgent = false;
1153         bool is_schema_nc = false;
1154         struct ldb_message_element *objectclass_el;
1155         struct replmd_private *replmd_private =
1156                 talloc_get_type_abort(ldb_module_get_private(module), struct replmd_private);
1157
1158         /* check if there's a show relax control (used by provision to say 'I know what I'm doing') */
1159         control = ldb_request_get_control(req, LDB_CONTROL_RELAX_OID);
1160         if (control) {
1161                 allow_add_guid = true;
1162         }
1163
1164         /* do not manipulate our control entries */
1165         if (ldb_dn_is_special(req->op.add.message->dn)) {
1166                 return ldb_next_request(module, req);
1167         }
1168
1169         ldb = ldb_module_get_ctx(module);
1170
1171         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
1172
1173         guid_blob = ldb_msg_find_ldb_val(req->op.add.message, "objectGUID");
1174         if (guid_blob != NULL) {
1175                 if (!allow_add_guid) {
1176                         ldb_set_errstring(ldb,
1177                                           "replmd_add: it's not allowed to add an object with objectGUID!");
1178                         return LDB_ERR_UNWILLING_TO_PERFORM;
1179                 } else {
1180                         NTSTATUS status = GUID_from_data_blob(guid_blob,&guid);
1181                         if (!NT_STATUS_IS_OK(status)) {
1182                                 ldb_set_errstring(ldb,
1183                                                   "replmd_add: Unable to parse the 'objectGUID' as a GUID!");
1184                                 return LDB_ERR_UNWILLING_TO_PERFORM;
1185                         }
1186                         /* we remove this attribute as it can be a string and
1187                          * will not be treated correctly and then we will re-add
1188                          * it later on in the good format */
1189                         remove_current_guid = true;
1190                 }
1191         } else {
1192                 /* a new GUID */
1193                 guid = GUID_random();
1194                 
1195                 guid_blob_stack = data_blob_const(guid_data, sizeof(guid_data));
1196                 
1197                 /* This can't fail */
1198                 ndr_push_struct_into_fixed_blob(&guid_blob_stack, &guid,
1199                                                 (ndr_push_flags_fn_t)ndr_push_GUID);
1200                 guid_blob = &guid_blob_stack;
1201         }
1202
1203         ac = replmd_ctx_init(module, req);
1204         if (ac == NULL) {
1205                 return ldb_module_oom(module);
1206         }
1207
1208         functional_level = dsdb_functional_level(ldb);
1209
1210         /* Get a sequence number from the backend */
1211         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ac->seq_num);
1212         if (ret != LDB_SUCCESS) {
1213                 talloc_free(ac);
1214                 return ret;
1215         }
1216
1217         /* we have to copy the message as the caller might have it as a const */
1218         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
1219         if (msg == NULL) {
1220                 ldb_oom(ldb);
1221                 talloc_free(ac);
1222                 return LDB_ERR_OPERATIONS_ERROR;
1223         }
1224
1225         /* generated times */
1226         unix_to_nt_time(&now, t);
1227         time_str = ldb_timestring(msg, t);
1228         if (!time_str) {
1229                 ldb_oom(ldb);
1230                 talloc_free(ac);
1231                 return LDB_ERR_OPERATIONS_ERROR;
1232         }
1233         if (remove_current_guid) {
1234                 ldb_msg_remove_attr(msg,"objectGUID");
1235         }
1236
1237         /*
1238          * remove autogenerated attributes
1239          */
1240         ldb_msg_remove_attr(msg, "whenCreated");
1241         ldb_msg_remove_attr(msg, "whenChanged");
1242         ldb_msg_remove_attr(msg, "uSNCreated");
1243         ldb_msg_remove_attr(msg, "uSNChanged");
1244         ldb_msg_remove_attr(msg, "replPropertyMetaData");
1245
1246         /*
1247          * readd replicated attributes
1248          */
1249         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
1250         if (ret != LDB_SUCCESS) {
1251                 ldb_oom(ldb);
1252                 talloc_free(ac);
1253                 return ret;
1254         }
1255
1256         /* build the replication meta_data */
1257         ZERO_STRUCT(nmd);
1258         nmd.version             = 1;
1259         nmd.ctr.ctr1.count      = msg->num_elements;
1260         nmd.ctr.ctr1.array      = talloc_array(msg,
1261                                                struct replPropertyMetaData1,
1262                                                nmd.ctr.ctr1.count);
1263         if (!nmd.ctr.ctr1.array) {
1264                 ldb_oom(ldb);
1265                 talloc_free(ac);
1266                 return LDB_ERR_OPERATIONS_ERROR;
1267         }
1268
1269         is_schema_nc = ldb_dn_compare_base(replmd_private->schema_dn, msg->dn) == 0;
1270
1271         for (i=0; i < msg->num_elements;) {
1272                 struct ldb_message_element *e = &msg->elements[i];
1273                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
1274                 const struct dsdb_attribute *sa;
1275
1276                 if (e->name[0] == '@') {
1277                         i++;
1278                         continue;
1279                 }
1280
1281                 sa = dsdb_attribute_by_lDAPDisplayName(ac->schema, e->name);
1282                 if (!sa) {
1283                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
1284                                       "replmd_add: attribute '%s' not defined in schema\n",
1285                                       e->name);
1286                         talloc_free(ac);
1287                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
1288                 }
1289
1290                 if ((sa->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (sa->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
1291                         /* if the attribute is not replicated (0x00000001)
1292                          * or constructed (0x00000004) it has no metadata
1293                          */
1294                         i++;
1295                         continue;
1296                 }
1297
1298                 if (sa->linkID != 0 && functional_level > DS_DOMAIN_FUNCTION_2000) {
1299                         if (extended_dn == NULL) {
1300                                 ret = replmd_add_make_extended_dn(req,
1301                                                                   guid_blob,
1302                                                                   &extended_dn);
1303                                 if (ret != LDB_SUCCESS) {
1304                                         talloc_free(ac);
1305                                         return ret;
1306                                 }
1307                         }                       
1308
1309                         /*
1310                          * Prepare the context for the backlinks and
1311                          * create metadata for the forward links.  The
1312                          * backlinks are created in
1313                          * replmd_op_callback() after the successful
1314                          * ADD of the object.
1315                          */
1316                         ret = replmd_add_fix_la(module, msg->elements,
1317                                                 replmd_private, e,
1318                                                 ac, now,
1319                                                 extended_dn,
1320                                                 sa, req);
1321                         if (ret != LDB_SUCCESS) {
1322                                 talloc_free(ac);
1323                                 return ret;
1324                         }
1325                         /* linked attributes are not stored in
1326                            replPropertyMetaData in FL above w2k */
1327                         i++;
1328                         continue;
1329                 }
1330
1331                 m->attid   = dsdb_attribute_get_attid(sa, is_schema_nc);
1332                 m->version = 1;
1333                 if (m->attid == DRSUAPI_ATTID_isDeleted) {
1334                         const struct ldb_val *rdn_val = ldb_dn_get_rdn_val(msg->dn);
1335                         const char* rdn;
1336
1337                         if (rdn_val == NULL) {
1338                                 ldb_oom(ldb);
1339                                 talloc_free(ac);
1340                                 return LDB_ERR_OPERATIONS_ERROR;
1341                         }
1342
1343                         rdn = (const char*)rdn_val->data;
1344                         if (strcmp(rdn, "Deleted Objects") == 0) {
1345                                 /*
1346                                  * Set the originating_change_time to 29/12/9999 at 23:59:59
1347                                  * as specified in MS-ADTS 7.1.1.4.2 Deleted Objects Container
1348                                  */
1349                                 m->originating_change_time      = DELETED_OBJECT_CONTAINER_CHANGE_TIME;
1350                         } else {
1351                                 m->originating_change_time      = now;
1352                         }
1353                 } else {
1354                         m->originating_change_time      = now;
1355                 }
1356                 m->originating_invocation_id    = ac->our_invocation_id;
1357                 m->originating_usn              = ac->seq_num;
1358                 m->local_usn                    = ac->seq_num;
1359                 ni++;
1360
1361                 if (!(e->flags & DSDB_FLAG_INTERNAL_FORCE_META_DATA)) {
1362                         i++;
1363                         continue;
1364                 }
1365
1366                 e->flags &= ~DSDB_FLAG_INTERNAL_FORCE_META_DATA;
1367
1368                 if (e->num_values != 0) {
1369                         i++;
1370                         continue;
1371                 }
1372
1373                 ldb_msg_remove_element(msg, e);
1374         }
1375
1376         /* fix meta data count */
1377         nmd.ctr.ctr1.count = ni;
1378
1379         /*
1380          * sort meta data array
1381          */
1382         ret = replmd_replPropertyMetaDataCtr1_sort_and_verify(ldb, &nmd.ctr.ctr1, msg->dn);
1383         if (ret != LDB_SUCCESS) {
1384                 ldb_asprintf_errstring(ldb, "%s: error during direct ADD: %s", __func__, ldb_errstring(ldb));
1385                 talloc_free(ac);
1386                 return ret;
1387         }
1388
1389         /* generated NDR encoded values */
1390         ndr_err = ndr_push_struct_blob(&nmd_value, msg,
1391                                        &nmd,
1392                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1393         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1394                 ldb_oom(ldb);
1395                 talloc_free(ac);
1396                 return LDB_ERR_OPERATIONS_ERROR;
1397         }
1398
1399         /*
1400          * add the autogenerated values
1401          */
1402         ret = dsdb_msg_add_guid(msg, &guid, "objectGUID");
1403         if (ret != LDB_SUCCESS) {
1404                 ldb_oom(ldb);
1405                 talloc_free(ac);
1406                 return ret;
1407         }
1408         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
1409         if (ret != LDB_SUCCESS) {
1410                 ldb_oom(ldb);
1411                 talloc_free(ac);
1412                 return ret;
1413         }
1414         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", ac->seq_num);
1415         if (ret != LDB_SUCCESS) {
1416                 ldb_oom(ldb);
1417                 talloc_free(ac);
1418                 return ret;
1419         }
1420         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ac->seq_num);
1421         if (ret != LDB_SUCCESS) {
1422                 ldb_oom(ldb);
1423                 talloc_free(ac);
1424                 return ret;
1425         }
1426         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
1427         if (ret != LDB_SUCCESS) {
1428                 ldb_oom(ldb);
1429                 talloc_free(ac);
1430                 return ret;
1431         }
1432
1433         /*
1434          * sort the attributes by attid before storing the object
1435          */
1436         replmd_ldb_message_sort(msg, ac->schema);
1437
1438         /*
1439          * Assert that we do have an objectClass
1440          */
1441         objectclass_el = ldb_msg_find_element(msg, "objectClass");
1442         if (objectclass_el == NULL) {
1443                 ldb_asprintf_errstring(ldb, __location__
1444                                        ": objectClass missing on %s\n",
1445                                        ldb_dn_get_linearized(msg->dn));
1446                 talloc_free(ac);
1447                 return LDB_ERR_OBJECT_CLASS_VIOLATION;
1448         }
1449         is_urgent = replmd_check_urgent_objectclass(objectclass_el,
1450                                                         REPL_URGENT_ON_CREATE);
1451
1452         ac->is_urgent = is_urgent;
1453         ret = ldb_build_add_req(&down_req, ldb, ac,
1454                                 msg,
1455                                 req->controls,
1456                                 ac, replmd_op_callback,
1457                                 req);
1458
1459         LDB_REQ_SET_LOCATION(down_req);
1460         if (ret != LDB_SUCCESS) {
1461                 talloc_free(ac);
1462                 return ret;
1463         }
1464
1465         /* current partition control is needed by "replmd_op_callback" */
1466         if (ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID) == NULL) {
1467                 ret = ldb_request_add_control(down_req,
1468                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
1469                                               false, NULL);
1470                 if (ret != LDB_SUCCESS) {
1471                         talloc_free(ac);
1472                         return ret;
1473                 }
1474         }
1475
1476         if (functional_level == DS_DOMAIN_FUNCTION_2000) {
1477                 ret = ldb_request_add_control(down_req, DSDB_CONTROL_APPLY_LINKS, false, NULL);
1478                 if (ret != LDB_SUCCESS) {
1479                         talloc_free(ac);
1480                         return ret;
1481                 }
1482         }
1483
1484         /* mark the control done */
1485         if (control) {
1486                 control->critical = 0;
1487         }
1488         /* go on with the call chain */
1489         return ldb_next_request(module, down_req);
1490 }
1491
1492
1493 /*
1494  * update the replPropertyMetaData for one element
1495  */
1496 static int replmd_update_rpmd_element(struct ldb_context *ldb,
1497                                       struct ldb_message *msg,
1498                                       struct ldb_message_element *el,
1499                                       struct ldb_message_element *old_el,
1500                                       struct replPropertyMetaDataBlob *omd,
1501                                       const struct dsdb_schema *schema,
1502                                       uint64_t *seq_num,
1503                                       const struct GUID *our_invocation_id,
1504                                       NTTIME now,
1505                                       bool is_schema_nc,
1506                                       bool is_forced_rodc,
1507                                       struct ldb_request *req)
1508 {
1509         uint32_t i;
1510         const struct dsdb_attribute *a;
1511         struct replPropertyMetaData1 *md1;
1512         bool may_skip = false;
1513         uint32_t attid;
1514
1515         a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
1516         if (a == NULL) {
1517                 if (ldb_request_get_control(req, LDB_CONTROL_RELAX_OID)) {
1518                         /* allow this to make it possible for dbcheck
1519                            to remove bad attributes */
1520                         return LDB_SUCCESS;
1521                 }
1522
1523                 DEBUG(0,(__location__ ": Unable to find attribute %s in schema\n",
1524                          el->name));
1525                 return LDB_ERR_OPERATIONS_ERROR;
1526         }
1527
1528         attid = dsdb_attribute_get_attid(a, is_schema_nc);
1529
1530         if ((a->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (a->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
1531                 return LDB_SUCCESS;
1532         }
1533
1534         /*
1535          * if the attribute's value haven't changed, and this isn't
1536          * just a delete of everything then return LDB_SUCCESS Unless
1537          * we have the provision control or if the attribute is
1538          * interSiteTopologyGenerator as this page explain:
1539          * http://support.microsoft.com/kb/224815 this attribute is
1540          * periodicaly written by the DC responsible for the intersite
1541          * generation in a given site
1542          *
1543          * Unchanged could be deleting or replacing an already-gone
1544          * thing with an unconstrained delete/empty replace or a
1545          * replace with the same value, but not an add with the same
1546          * value because that could be about adding a duplicate (which
1547          * is for someone else to error out on).
1548          */
1549         if (old_el != NULL && ldb_msg_element_equal_ordered(el, old_el)) {
1550                 if (LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_REPLACE) {
1551                         may_skip = true;
1552                 }
1553         } else if (old_el == NULL && el->num_values == 0) {
1554                 if (LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_REPLACE) {
1555                         may_skip = true;
1556                 } else if (LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_DELETE) {
1557                         may_skip = true;
1558                 }
1559         } else if (a->linkID != 0 && LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_DELETE &&
1560                    ldb_request_get_control(req, DSDB_CONTROL_REPLMD_VANISH_LINKS) != NULL) {
1561                 /*
1562                  * We intentionally skip the version bump when attempting to
1563                  * vanish links.
1564                  *
1565                  * The control is set by dbcheck and expunge-tombstones which
1566                  * both attempt to be non-replicating. Otherwise, making an
1567                  * alteration to the replication state would trigger a
1568                  * broadcast of all expunged objects.
1569                  */
1570                 may_skip = true;
1571         }
1572
1573         if (el->flags & DSDB_FLAG_INTERNAL_FORCE_META_DATA) {
1574                 may_skip = false;
1575                 el->flags &= ~DSDB_FLAG_INTERNAL_FORCE_META_DATA;
1576         }
1577
1578         if (may_skip) {
1579                 if (strcmp(el->name, "interSiteTopologyGenerator") != 0 &&
1580                     !ldb_request_get_control(req, LDB_CONTROL_PROVISION_OID)) {
1581                         /*
1582                          * allow this to make it possible for dbcheck
1583                          * to rebuild broken metadata
1584                          */
1585                         return LDB_SUCCESS;
1586                 }
1587         }
1588
1589         for (i=0; i<omd->ctr.ctr1.count; i++) {
1590                 /*
1591                  * First check if we find it under the msDS-IntID,
1592                  * then check if we find it under the OID and
1593                  * prefixMap ID.
1594                  *
1595                  * This allows the administrator to simply re-write
1596                  * the attributes and so restore replication, which is
1597                  * likely what they will try to do.
1598                  */
1599                 if (attid == omd->ctr.ctr1.array[i].attid) {
1600                         break;
1601                 }
1602
1603                 if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) {
1604                         break;
1605                 }
1606         }
1607
1608         if (a->linkID != 0 && dsdb_functional_level(ldb) > DS_DOMAIN_FUNCTION_2000) {
1609                 /* linked attributes are not stored in
1610                    replPropertyMetaData in FL above w2k, but we do
1611                    raise the seqnum for the object  */
1612                 if (*seq_num == 0 &&
1613                     ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num) != LDB_SUCCESS) {
1614                         return LDB_ERR_OPERATIONS_ERROR;
1615                 }
1616                 return LDB_SUCCESS;
1617         }
1618
1619         if (i == omd->ctr.ctr1.count) {
1620                 /* we need to add a new one */
1621                 omd->ctr.ctr1.array = talloc_realloc(msg, omd->ctr.ctr1.array,
1622                                                      struct replPropertyMetaData1, omd->ctr.ctr1.count+1);
1623                 if (omd->ctr.ctr1.array == NULL) {
1624                         ldb_oom(ldb);
1625                         return LDB_ERR_OPERATIONS_ERROR;
1626                 }
1627                 omd->ctr.ctr1.count++;
1628                 ZERO_STRUCT(omd->ctr.ctr1.array[i]);
1629         }
1630
1631         /* Get a new sequence number from the backend. We only do this
1632          * if we have a change that requires a new
1633          * replPropertyMetaData element
1634          */
1635         if (*seq_num == 0) {
1636                 int ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num);
1637                 if (ret != LDB_SUCCESS) {
1638                         return LDB_ERR_OPERATIONS_ERROR;
1639                 }
1640         }
1641
1642         md1 = &omd->ctr.ctr1.array[i];
1643         md1->version++;
1644         md1->attid = attid;
1645
1646         if (md1->attid == DRSUAPI_ATTID_isDeleted) {
1647                 const struct ldb_val *rdn_val = ldb_dn_get_rdn_val(msg->dn);
1648                 const char* rdn;
1649
1650                 if (rdn_val == NULL) {
1651                         ldb_oom(ldb);
1652                         return LDB_ERR_OPERATIONS_ERROR;
1653                 }
1654
1655                 rdn = (const char*)rdn_val->data;
1656                 if (strcmp(rdn, "Deleted Objects") == 0) {
1657                         /*
1658                          * Set the originating_change_time to 29/12/9999 at 23:59:59
1659                          * as specified in MS-ADTS 7.1.1.4.2 Deleted Objects Container
1660                          */
1661                         md1->originating_change_time    = DELETED_OBJECT_CONTAINER_CHANGE_TIME;
1662                 } else {
1663                         md1->originating_change_time    = now;
1664                 }
1665         } else {
1666                 md1->originating_change_time    = now;
1667         }
1668         md1->originating_invocation_id = *our_invocation_id;
1669         md1->originating_usn           = *seq_num;
1670         md1->local_usn                 = *seq_num;
1671
1672         if (is_forced_rodc) {
1673                 /* Force version to 0 to be overriden later via replication */
1674                 md1->version = 0;
1675         }
1676
1677         return LDB_SUCCESS;
1678 }
1679
1680 /*
1681  * Bump the replPropertyMetaData version on an attribute, and if it
1682  * has changed (or forced by leaving rdn_old NULL), update the value
1683  * in the entry.
1684  *
1685  * This is important, as calling a modify operation may not change the
1686  * version number if the values appear unchanged, but a rename between
1687  * parents bumps this value.
1688  *
1689  */
1690 static int replmd_update_rpmd_rdn_attr(struct ldb_context *ldb,
1691                                        struct ldb_message *msg,
1692                                        const struct ldb_val *rdn_new,
1693                                        const struct ldb_val *rdn_old,
1694                                        struct replPropertyMetaDataBlob *omd,
1695                                        struct replmd_replicated_request *ar,
1696                                        NTTIME now,
1697                                        bool is_schema_nc,
1698                                        bool is_forced_rodc)
1699 {
1700         const char *rdn_name = ldb_dn_get_rdn_name(msg->dn);
1701         const struct dsdb_attribute *rdn_attr =
1702                 dsdb_attribute_by_lDAPDisplayName(ar->schema, rdn_name);
1703         const char *attr_name = rdn_attr != NULL ?
1704                                 rdn_attr->lDAPDisplayName :
1705                                 rdn_name;
1706         struct ldb_message_element new_el = {
1707                 .flags = LDB_FLAG_MOD_REPLACE,
1708                 .name = attr_name,
1709                 .num_values = 1,
1710                 .values = discard_const_p(struct ldb_val, rdn_new)
1711         };
1712         struct ldb_message_element old_el = {
1713                 .flags = LDB_FLAG_MOD_REPLACE,
1714                 .name = attr_name,
1715                 .num_values = rdn_old ? 1 : 0,
1716                 .values = discard_const_p(struct ldb_val, rdn_old)
1717         };
1718
1719         if (ldb_msg_element_equal_ordered(&new_el, &old_el) == false) {
1720                 int ret = ldb_msg_add(msg, &new_el, LDB_FLAG_MOD_REPLACE);
1721                 if (ret != LDB_SUCCESS) {
1722                         return ldb_oom(ldb);
1723                 }
1724         }
1725
1726         return replmd_update_rpmd_element(ldb, msg, &new_el, NULL,
1727                                           omd, ar->schema, &ar->seq_num,
1728                                           &ar->our_invocation_id,
1729                                           now, is_schema_nc, is_forced_rodc,
1730                                           ar->req);
1731
1732 }
1733
1734 static uint64_t find_max_local_usn(struct replPropertyMetaDataBlob omd)
1735 {
1736         uint32_t count = omd.ctr.ctr1.count;
1737         uint64_t max = 0;
1738         uint32_t i;
1739         for (i=0; i < count; i++) {
1740                 struct replPropertyMetaData1 m = omd.ctr.ctr1.array[i];
1741                 if (max < m.local_usn) {
1742                         max = m.local_usn;
1743                 }
1744         }
1745         return max;
1746 }
1747
1748 /*
1749  * update the replPropertyMetaData object each time we modify an
1750  * object. This is needed for DRS replication, as the merge on the
1751  * client is based on this object
1752  */
1753 static int replmd_update_rpmd(struct ldb_module *module,
1754                               const struct dsdb_schema *schema,
1755                               struct ldb_request *req,
1756                               const char * const *rename_attrs,
1757                               struct ldb_message *msg, uint64_t *seq_num,
1758                               time_t t, bool is_schema_nc,
1759                               bool *is_urgent, bool *rodc)
1760 {
1761         const struct ldb_val *omd_value;
1762         enum ndr_err_code ndr_err;
1763         struct replPropertyMetaDataBlob omd;
1764         unsigned int i;
1765         NTTIME now;
1766         const struct GUID *our_invocation_id;
1767         int ret;
1768         const char * const *attrs = NULL;
1769         const char * const attrs2[] = { "uSNChanged", "objectClass", "instanceType", NULL };
1770         struct ldb_result *res;
1771         struct ldb_context *ldb;
1772         struct ldb_message_element *objectclass_el;
1773         enum urgent_situation situation;
1774         bool rmd_is_provided;
1775         bool rmd_is_just_resorted = false;
1776         const char *not_rename_attrs[4 + msg->num_elements];
1777         bool is_forced_rodc = false;
1778
1779         if (rename_attrs) {
1780                 attrs = rename_attrs;
1781         } else {
1782                 for (i = 0; i < msg->num_elements; i++) {
1783                         not_rename_attrs[i] = msg->elements[i].name;
1784                 }
1785                 not_rename_attrs[i] = "replPropertyMetaData";
1786                 not_rename_attrs[i+1] = "objectClass";
1787                 not_rename_attrs[i+2] = "instanceType";
1788                 not_rename_attrs[i+3] = NULL;
1789                 attrs = not_rename_attrs;
1790         }
1791
1792         ldb = ldb_module_get_ctx(module);
1793
1794         ret = samdb_rodc(ldb, rodc);
1795         if (ret != LDB_SUCCESS) {
1796                 DEBUG(4, (__location__ ": unable to tell if we are an RODC\n"));
1797                 *rodc = false;
1798         }
1799
1800         if (*rodc &&
1801             ldb_request_get_control(req, DSDB_CONTROL_FORCE_RODC_LOCAL_CHANGE)) {
1802                 is_forced_rodc = true;
1803         }
1804
1805         our_invocation_id = samdb_ntds_invocation_id(ldb);
1806         if (!our_invocation_id) {
1807                 /* this happens during an initial vampire while
1808                    updating the schema */
1809                 DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
1810                 return LDB_SUCCESS;
1811         }
1812
1813         unix_to_nt_time(&now, t);
1814
1815         if (ldb_request_get_control(req, DSDB_CONTROL_CHANGEREPLMETADATA_OID)) {
1816                 rmd_is_provided = true;
1817                 if (ldb_request_get_control(req, DSDB_CONTROL_CHANGEREPLMETADATA_RESORT_OID)) {
1818                         rmd_is_just_resorted = true;
1819                 }
1820         } else {
1821                 rmd_is_provided = false;
1822         }
1823
1824         /* if isDeleted is present and is TRUE, then we consider we are deleting,
1825          * otherwise we consider we are updating */
1826         if (ldb_msg_check_string_attribute(msg, "isDeleted", "TRUE")) {
1827                 situation = REPL_URGENT_ON_DELETE;
1828         } else if (rename_attrs) {
1829                 situation = REPL_URGENT_ON_CREATE | REPL_URGENT_ON_DELETE;
1830         } else {
1831                 situation = REPL_URGENT_ON_UPDATE;
1832         }
1833
1834         if (rmd_is_provided) {
1835                 /* In this case the change_replmetadata control was supplied */
1836                 /* We check that it's the only attribute that is provided
1837                  * (it's a rare case so it's better to keep the code simplier)
1838                  * We also check that the highest local_usn is bigger or the same as
1839                  * uSNChanged. */
1840                 uint64_t db_seq;
1841                 if( msg->num_elements != 1 ||
1842                         strncmp(msg->elements[0].name,
1843                                 "replPropertyMetaData", 20) ) {
1844                         DEBUG(0,(__location__ ": changereplmetada control called without "\
1845                                 "a specified replPropertyMetaData attribute or with others\n"));
1846                         return LDB_ERR_OPERATIONS_ERROR;
1847                 }
1848                 if (situation != REPL_URGENT_ON_UPDATE) {
1849                         DEBUG(0,(__location__ ": changereplmetada control can't be called when deleting an object\n"));
1850                         return LDB_ERR_OPERATIONS_ERROR;
1851                 }
1852                 omd_value = ldb_msg_find_ldb_val(msg, "replPropertyMetaData");
1853                 if (!omd_value) {
1854                         DEBUG(0,(__location__ ": replPropertyMetaData was not specified for Object %s\n",
1855                                  ldb_dn_get_linearized(msg->dn)));
1856                         return LDB_ERR_OPERATIONS_ERROR;
1857                 }
1858                 ndr_err = ndr_pull_struct_blob(omd_value, msg, &omd,
1859                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
1860                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1861                         DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
1862                                  ldb_dn_get_linearized(msg->dn)));
1863                         return LDB_ERR_OPERATIONS_ERROR;
1864                 }
1865
1866                 ret = dsdb_module_search_dn(module, msg, &res, msg->dn, attrs2,
1867                                             DSDB_FLAG_NEXT_MODULE |
1868                                             DSDB_SEARCH_SHOW_RECYCLED |
1869                                             DSDB_SEARCH_SHOW_EXTENDED_DN |
1870                                             DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT |
1871                                             DSDB_SEARCH_REVEAL_INTERNALS, req);
1872
1873                 if (ret != LDB_SUCCESS) {
1874                         return ret;
1875                 }
1876
1877                 if (rmd_is_just_resorted == false) {
1878                         *seq_num = find_max_local_usn(omd);
1879
1880                         db_seq = ldb_msg_find_attr_as_uint64(res->msgs[0], "uSNChanged", 0);
1881
1882                         /*
1883                          * The test here now allows for a new
1884                          * replPropertyMetaData with no change, if was
1885                          * just dbcheck re-sorting the values.
1886                          */
1887                         if (*seq_num <= db_seq) {
1888                                 DEBUG(0,(__location__ ": changereplmetada control provided but max(local_usn)" \
1889                                          " is less than uSNChanged (max = %lld uSNChanged = %lld)\n",
1890                                          (long long)*seq_num, (long long)db_seq));
1891                                 return LDB_ERR_OPERATIONS_ERROR;
1892                         }
1893                 }
1894
1895         } else {
1896                 /* search for the existing replPropertyMetaDataBlob. We need
1897                  * to use REVEAL and ask for DNs in storage format to support
1898                  * the check for values being the same in
1899                  * replmd_update_rpmd_element()
1900                  */
1901                 ret = dsdb_module_search_dn(module, msg, &res, msg->dn, attrs,
1902                                             DSDB_FLAG_NEXT_MODULE |
1903                                             DSDB_SEARCH_SHOW_RECYCLED |
1904                                             DSDB_SEARCH_SHOW_EXTENDED_DN |
1905                                             DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT |
1906                                             DSDB_SEARCH_REVEAL_INTERNALS, req);
1907                 if (ret != LDB_SUCCESS) {
1908                         return ret;
1909                 }
1910
1911                 omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
1912                 if (!omd_value) {
1913                         DEBUG(0,(__location__ ": Object %s does not have a replPropertyMetaData attribute\n",
1914                                  ldb_dn_get_linearized(msg->dn)));
1915                         return LDB_ERR_OPERATIONS_ERROR;
1916                 }
1917
1918                 ndr_err = ndr_pull_struct_blob(omd_value, msg, &omd,
1919                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
1920                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1921                         DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
1922                                  ldb_dn_get_linearized(msg->dn)));
1923                         return LDB_ERR_OPERATIONS_ERROR;
1924                 }
1925
1926                 if (omd.version != 1) {
1927                         DEBUG(0,(__location__ ": bad version %u in replPropertyMetaData for %s\n",
1928                                  omd.version, ldb_dn_get_linearized(msg->dn)));
1929                         return LDB_ERR_OPERATIONS_ERROR;
1930                 }
1931
1932                 for (i=0; i<msg->num_elements;) {
1933                         struct ldb_message_element *el = &msg->elements[i];
1934                         struct ldb_message_element *old_el;
1935
1936                         old_el = ldb_msg_find_element(res->msgs[0], el->name);
1937                         ret = replmd_update_rpmd_element(ldb, msg, el, old_el,
1938                                                          &omd, schema, seq_num,
1939                                                          our_invocation_id,
1940                                                          now, is_schema_nc,
1941                                                          is_forced_rodc,
1942                                                          req);
1943                         if (ret != LDB_SUCCESS) {
1944                                 return ret;
1945                         }
1946
1947                         if (!*is_urgent && (situation == REPL_URGENT_ON_UPDATE)) {
1948                                 *is_urgent = replmd_check_urgent_attribute(el);
1949                         }
1950
1951                         if (!(el->flags & DSDB_FLAG_INTERNAL_FORCE_META_DATA)) {
1952                                 i++;
1953                                 continue;
1954                         }
1955
1956                         el->flags &= ~DSDB_FLAG_INTERNAL_FORCE_META_DATA;
1957
1958                         if (el->num_values != 0) {
1959                                 i++;
1960                                 continue;
1961                         }
1962
1963                         ldb_msg_remove_element(msg, el);
1964                 }
1965         }
1966
1967         /*
1968          * Assert that we have an objectClass attribute - this is major
1969          * corruption if we don't have this!
1970          */
1971         objectclass_el = ldb_msg_find_element(res->msgs[0], "objectClass");
1972         if (objectclass_el != NULL) {
1973                 /*
1974                  * Now check if this objectClass means we need to do urgent replication
1975                  */
1976                 if (!*is_urgent && replmd_check_urgent_objectclass(objectclass_el,
1977                                                                    situation)) {
1978                         *is_urgent = true;
1979                 }
1980         } else if (!ldb_request_get_control(req, DSDB_CONTROL_DBCHECK)) {
1981                 ldb_asprintf_errstring(ldb, __location__
1982                                        ": objectClass missing on %s\n",
1983                                        ldb_dn_get_linearized(msg->dn));
1984                 return LDB_ERR_OBJECT_CLASS_VIOLATION;
1985         }
1986
1987         /*
1988          * replmd_update_rpmd_element has done an update if the
1989          * seq_num is set
1990          */
1991         if (*seq_num != 0 || rmd_is_just_resorted == true) {
1992                 struct ldb_val *md_value;
1993                 struct ldb_message_element *el;
1994
1995                 /*if we are RODC and this is a DRSR update then its ok*/
1996                 if (!ldb_request_get_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID)
1997                     && !ldb_request_get_control(req, DSDB_CONTROL_DBCHECK_MODIFY_RO_REPLICA)
1998                     && !is_forced_rodc) {
1999                         unsigned instanceType;
2000
2001                         if (*rodc) {
2002                                 ldb_set_errstring(ldb, "RODC modify is forbidden!");
2003                                 return LDB_ERR_REFERRAL;
2004                         }
2005
2006                         instanceType = ldb_msg_find_attr_as_uint(res->msgs[0], "instanceType", INSTANCE_TYPE_WRITE);
2007                         if (!(instanceType & INSTANCE_TYPE_WRITE)) {
2008                                 return ldb_error(ldb, LDB_ERR_UNWILLING_TO_PERFORM,
2009                                                  "cannot change replicated attribute on partial replica");
2010                         }
2011                 }
2012
2013                 md_value = talloc(msg, struct ldb_val);
2014                 if (md_value == NULL) {
2015                         ldb_oom(ldb);
2016                         return LDB_ERR_OPERATIONS_ERROR;
2017                 }
2018
2019                 ret = replmd_replPropertyMetaDataCtr1_sort_and_verify(ldb, &omd.ctr.ctr1, msg->dn);
2020                 if (ret != LDB_SUCCESS) {
2021                         ldb_asprintf_errstring(ldb, "%s: %s", __func__, ldb_errstring(ldb));
2022                         return ret;
2023                 }
2024
2025                 ndr_err = ndr_push_struct_blob(md_value, msg, &omd,
2026                                                (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
2027                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
2028                         DEBUG(0,(__location__ ": Failed to marshall replPropertyMetaData for %s\n",
2029                                  ldb_dn_get_linearized(msg->dn)));
2030                         return LDB_ERR_OPERATIONS_ERROR;
2031                 }
2032
2033                 ret = ldb_msg_add_empty(msg, "replPropertyMetaData", LDB_FLAG_MOD_REPLACE, &el);
2034                 if (ret != LDB_SUCCESS) {
2035                         DEBUG(0,(__location__ ": Failed to add updated replPropertyMetaData %s\n",
2036                                  ldb_dn_get_linearized(msg->dn)));
2037                         return ret;
2038                 }
2039
2040                 el->num_values = 1;
2041                 el->values = md_value;
2042         }
2043
2044         return LDB_SUCCESS;
2045 }
2046
2047 static int parsed_dn_compare(struct parsed_dn *pdn1, struct parsed_dn *pdn2)
2048 {
2049         int ret = ndr_guid_compare(&pdn1->guid, &pdn2->guid);
2050         if (ret == 0) {
2051                 return data_blob_cmp(&pdn1->dsdb_dn->extra_part,
2052                                      &pdn2->dsdb_dn->extra_part);
2053         }
2054         return ret;
2055 }
2056
2057 /*
2058   get a series of message element values as an array of DNs and GUIDs
2059   the result is sorted by GUID
2060  */
2061 static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
2062                           struct ldb_message_element *el, struct parsed_dn **pdn,
2063                           const char *ldap_oid, struct ldb_request *parent)
2064 {
2065         unsigned int i;
2066         bool values_are_sorted = true;
2067         struct ldb_context *ldb = ldb_module_get_ctx(module);
2068
2069         if (el == NULL) {
2070                 *pdn = NULL;
2071                 return LDB_SUCCESS;
2072         }
2073
2074         (*pdn) = talloc_array(mem_ctx, struct parsed_dn, el->num_values);
2075         if (!*pdn) {
2076                 ldb_module_oom(module);
2077                 return LDB_ERR_OPERATIONS_ERROR;
2078         }
2079
2080         for (i=0; i<el->num_values; i++) {
2081                 struct ldb_val *v = &el->values[i];
2082                 NTSTATUS status;
2083                 struct ldb_dn *dn;
2084                 struct parsed_dn *p;
2085
2086                 p = &(*pdn)[i];
2087
2088                 p->dsdb_dn = dsdb_dn_parse(*pdn, ldb, v, ldap_oid);
2089                 if (p->dsdb_dn == NULL) {
2090                         return LDB_ERR_INVALID_DN_SYNTAX;
2091                 }
2092
2093                 dn = p->dsdb_dn->dn;
2094
2095                 status = dsdb_get_extended_dn_guid(dn, &p->guid, "GUID");
2096                 if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) ||
2097                     unlikely(GUID_all_zero(&p->guid))) {
2098                         /* we got a DN without a GUID - go find the GUID */
2099                         int ret = dsdb_module_guid_by_dn(module, dn, &p->guid, parent);
2100                         if (ret != LDB_SUCCESS) {
2101                                 char *dn_str = NULL;
2102                                 dn_str = ldb_dn_get_extended_linearized(mem_ctx,
2103                                                                         (dn), 1);
2104                                 ldb_asprintf_errstring(ldb,
2105                                                 "Unable to find GUID for DN %s\n",
2106                                                 dn_str);
2107                                 if (ret == LDB_ERR_NO_SUCH_OBJECT &&
2108                                     LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_DELETE &&
2109                                     ldb_attr_cmp(el->name, "member") == 0) {
2110                                         return LDB_ERR_UNWILLING_TO_PERFORM;
2111                                 }
2112                                 return ret;
2113                         }
2114                         ret = dsdb_set_extended_dn_guid(dn, &p->guid, "GUID");
2115                         if (ret != LDB_SUCCESS) {
2116                                 return ret;
2117                         }
2118                 } else if (!NT_STATUS_IS_OK(status)) {
2119                         return LDB_ERR_OPERATIONS_ERROR;
2120                 }
2121                 if (i > 0 && values_are_sorted) {
2122                         int cmp = parsed_dn_compare(p, &(*pdn)[i - 1]);
2123                         if (cmp < 0) {
2124                                 values_are_sorted = false;
2125                         }
2126                 }
2127                 /* keep a pointer to the original ldb_val */
2128                 p->v = v;
2129         }
2130         if (! values_are_sorted) {
2131                 TYPESAFE_QSORT(*pdn, el->num_values, parsed_dn_compare);
2132         }
2133         return LDB_SUCCESS;
2134 }
2135
2136 /*
2137  * Get a series of trusted message element values. The result is sorted by
2138  * GUID, even though the GUIDs might not be known. That works because we trust
2139  * the database to give us the elements like that if the
2140  * replmd_private->sorted_links flag is set.
2141  *
2142  * We also ensure that the links are in the Functional Level 2003
2143  * linked attributes format.
2144  */
2145 static int get_parsed_dns_trusted(struct ldb_module *module,
2146                                   struct replmd_private *replmd_private,
2147                                   TALLOC_CTX *mem_ctx,
2148                                   struct ldb_message_element *el,
2149                                   struct parsed_dn **pdn,
2150                                   const char *ldap_oid,
2151                                   struct ldb_request *parent)
2152 {
2153         unsigned int i;
2154         int ret;
2155         if (el == NULL) {
2156                 *pdn = NULL;
2157                 return LDB_SUCCESS;
2158         }
2159
2160         if (!replmd_private->sorted_links) {
2161                 /* We need to sort the list. This is the slow old path we want
2162                    to avoid.
2163                  */
2164                 ret = get_parsed_dns(module, mem_ctx, el, pdn, ldap_oid,
2165                                       parent);
2166                 if (ret != LDB_SUCCESS) {
2167                         return ret;
2168                 }
2169         } else {
2170                 /* Here we get a list of 'struct parsed_dns' without the parsing */
2171                 *pdn = talloc_zero_array(mem_ctx, struct parsed_dn,
2172                                          el->num_values);
2173                 if (!*pdn) {
2174                         ldb_module_oom(module);
2175                         return LDB_ERR_OPERATIONS_ERROR;
2176                 }
2177
2178                 for (i = 0; i < el->num_values; i++) {
2179                         (*pdn)[i].v = &el->values[i];
2180                 }
2181         }
2182
2183         /*
2184          * This upgrades links to FL2003 style, and sorts the result
2185          * if that was needed.
2186          *
2187          * TODO: Add a database feature that asserts we have no FL2000
2188          *       style links to avoid this check or add a feature that
2189          *       uses a similar check to find sorted/unsorted links
2190          *       for an on-the-fly upgrade.
2191          */
2192
2193         ret = replmd_check_upgrade_links(ldb_module_get_ctx(module),
2194                                          *pdn, el->num_values,
2195                                          el,
2196                                          ldap_oid);
2197         if (ret != LDB_SUCCESS) {
2198                 return ret;
2199         }
2200
2201         return LDB_SUCCESS;
2202 }
2203
2204 /*
2205    Return LDB_SUCCESS if a parsed_dn list contains no duplicate values,
2206    otherwise an error code. For compatibility the error code differs depending
2207    on whether or not the attribute is "member".
2208
2209    As always, the parsed_dn list is assumed to be sorted.
2210  */
2211 static int check_parsed_dn_duplicates(struct ldb_module *module,
2212                                       struct ldb_message_element *el,
2213                                       struct parsed_dn *pdn)
2214 {
2215         unsigned int i;
2216         struct ldb_context *ldb = ldb_module_get_ctx(module);
2217
2218         for (i = 1; i < el->num_values; i++) {
2219                 struct parsed_dn *p = &pdn[i];
2220                 if (parsed_dn_compare(p, &pdn[i - 1]) == 0) {
2221                         ldb_asprintf_errstring(ldb,
2222                                                "Linked attribute %s has "
2223                                                "multiple identical values",
2224                                                el->name);
2225                         if (ldb_attr_cmp(el->name, "member") == 0) {
2226                                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
2227                         } else {
2228                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
2229                         }
2230                 }
2231         }
2232         return LDB_SUCCESS;
2233 }
2234
2235 /*
2236   build a new extended DN, including all meta data fields
2237
2238   RMD_FLAGS           = DSDB_RMD_FLAG_* bits
2239   RMD_ADDTIME         = originating_add_time
2240   RMD_INVOCID         = originating_invocation_id
2241   RMD_CHANGETIME      = originating_change_time
2242   RMD_ORIGINATING_USN = originating_usn
2243   RMD_LOCAL_USN       = local_usn
2244   RMD_VERSION         = version
2245  */
2246 static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v,
2247                                struct dsdb_dn *dsdb_dn,
2248                                const struct GUID *invocation_id,
2249                                uint64_t local_usn, NTTIME nttime)
2250 {
2251         return replmd_set_la_val(mem_ctx, v, dsdb_dn, NULL, invocation_id,
2252                                  local_usn, local_usn, nttime,
2253                                  RMD_VERSION_INITIAL, false);
2254 }
2255
2256 static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
2257                                 struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
2258                                 uint64_t seq_num, uint64_t local_usn, NTTIME nttime,
2259                                 bool deleted);
2260
2261 /*
2262   check if any links need upgrading from w2k format
2263  */
2264 static int replmd_check_upgrade_links(struct ldb_context *ldb,
2265                                       struct parsed_dn *dns, uint32_t count,
2266                                       struct ldb_message_element *el,
2267                                       const char *ldap_oid)
2268 {
2269         uint32_t i;
2270         const struct GUID *invocation_id = NULL;
2271         for (i=0; i<count; i++) {
2272                 NTSTATUS status;
2273                 uint32_t version;
2274                 int ret;
2275                 if (dns[i].dsdb_dn == NULL) {
2276                         ret = really_parse_trusted_dn(dns, ldb, &dns[i],
2277                                                       ldap_oid);
2278                         if (ret != LDB_SUCCESS) {
2279                                 return LDB_ERR_INVALID_DN_SYNTAX;
2280                         }
2281                 }
2282
2283                 status = dsdb_get_extended_dn_uint32(dns[i].dsdb_dn->dn,
2284                                                      &version, "RMD_VERSION");
2285                 if (!NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
2286                         /*
2287                          *  We optimistically assume they are all the same; if
2288                          *  the first one is fixed, they are all fixed.
2289                          *
2290                          *  If the first one was *not* fixed and we find a
2291                          *  later one that is, that is an occasion to shout
2292                          *  with DEBUG(0).
2293                          */
2294                         if (i == 0) {
2295                                 return LDB_SUCCESS;
2296                         }
2297                         DEBUG(0, ("Mixed w2k and fixed format "
2298                                   "linked attributes\n"));
2299                         continue;
2300                 }
2301
2302                 if (invocation_id == NULL) {
2303                         invocation_id = samdb_ntds_invocation_id(ldb);
2304                         if (invocation_id == NULL) {
2305                                 return LDB_ERR_OPERATIONS_ERROR;
2306                         }
2307                 }
2308
2309
2310                 /* it's an old one that needs upgrading */
2311                 ret = replmd_update_la_val(el->values, dns[i].v,
2312                                            dns[i].dsdb_dn, dns[i].dsdb_dn,
2313                                            invocation_id, 1, 1, 0, false);
2314                 if (ret != LDB_SUCCESS) {
2315                         return ret;
2316                 }
2317         }
2318
2319         /*
2320          * This sort() is critical for the operation of
2321          * get_parsed_dns_trusted() because callers of this function
2322          * expect a sorted list, and FL2000 style links are not
2323          * sorted.  In particular, as well as the upgrade case,
2324          * get_parsed_dns_trusted() is called from
2325          * replmd_delete_remove_link() even in FL2000 mode
2326          *
2327          * We do not normally pay the cost of the qsort() due to the
2328          * early return in the RMD_VERSION found case.
2329          */
2330         TYPESAFE_QSORT(dns, count, parsed_dn_compare);
2331         return LDB_SUCCESS;
2332 }
2333
2334 /*
2335   Sets the value for a linked attribute, including all meta data fields
2336
2337   see replmd_build_la_val for value names
2338  */
2339 static int replmd_set_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
2340                              struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
2341                              uint64_t usn, uint64_t local_usn, NTTIME nttime,
2342                              uint32_t version, bool deleted)
2343 {
2344         struct ldb_dn *dn = dsdb_dn->dn;
2345         const char *tstring, *usn_string, *flags_string;
2346         struct ldb_val tval;
2347         struct ldb_val iid;
2348         struct ldb_val usnv, local_usnv;
2349         struct ldb_val vers, flagsv;
2350         const struct ldb_val *old_addtime = NULL;
2351         NTSTATUS status;
2352         int ret;
2353         const char *dnstring;
2354         char *vstring;
2355         uint32_t rmd_flags = deleted?DSDB_RMD_FLAG_DELETED:0;
2356
2357         tstring = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)nttime);
2358         if (!tstring) {
2359                 return LDB_ERR_OPERATIONS_ERROR;
2360         }
2361         tval = data_blob_string_const(tstring);
2362
2363         usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)usn);
2364         if (!usn_string) {
2365                 return LDB_ERR_OPERATIONS_ERROR;
2366         }
2367         usnv = data_blob_string_const(usn_string);
2368
2369         usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)local_usn);
2370         if (!usn_string) {
2371                 return LDB_ERR_OPERATIONS_ERROR;
2372         }
2373         local_usnv = data_blob_string_const(usn_string);
2374
2375         status = GUID_to_ndr_blob(invocation_id, dn, &iid);
2376         if (!NT_STATUS_IS_OK(status)) {
2377                 return LDB_ERR_OPERATIONS_ERROR;
2378         }
2379
2380         flags_string = talloc_asprintf(mem_ctx, "%u", rmd_flags);
2381         if (!flags_string) {
2382                 return LDB_ERR_OPERATIONS_ERROR;
2383         }
2384         flagsv = data_blob_string_const(flags_string);
2385
2386         ret = ldb_dn_set_extended_component(dn, "RMD_FLAGS", &flagsv);
2387         if (ret != LDB_SUCCESS) return ret;
2388
2389         /* get the ADDTIME from the original */
2390         if (old_dsdb_dn != NULL) {
2391                 old_addtime = ldb_dn_get_extended_component(old_dsdb_dn->dn,
2392                                                             "RMD_ADDTIME");
2393         }
2394         if (old_addtime == NULL) {
2395                 old_addtime = &tval;
2396         }
2397         if (dsdb_dn != old_dsdb_dn ||
2398             ldb_dn_get_extended_component(dn, "RMD_ADDTIME") == NULL) {
2399                 ret = ldb_dn_set_extended_component(dn, "RMD_ADDTIME", old_addtime);
2400                 if (ret != LDB_SUCCESS) return ret;
2401         }
2402
2403         /* use our invocation id */
2404         ret = ldb_dn_set_extended_component(dn, "RMD_INVOCID", &iid);
2405         if (ret != LDB_SUCCESS) return ret;
2406
2407         /* changetime is the current time */
2408         ret = ldb_dn_set_extended_component(dn, "RMD_CHANGETIME", &tval);
2409         if (ret != LDB_SUCCESS) return ret;
2410
2411         /* update the USN */
2412         ret = ldb_dn_set_extended_component(dn, "RMD_ORIGINATING_USN", &usnv);
2413         if (ret != LDB_SUCCESS) return ret;
2414
2415         ret = ldb_dn_set_extended_component(dn, "RMD_LOCAL_USN", &local_usnv);
2416         if (ret != LDB_SUCCESS) return ret;
2417
2418         vstring = talloc_asprintf(mem_ctx, "%lu", (unsigned long)version);
2419         vers = data_blob_string_const(vstring);
2420         ret = ldb_dn_set_extended_component(dn, "RMD_VERSION", &vers);
2421         if (ret != LDB_SUCCESS) return ret;
2422
2423         dnstring = dsdb_dn_get_extended_linearized(mem_ctx, dsdb_dn, 1);
2424         if (dnstring == NULL) {
2425                 return LDB_ERR_OPERATIONS_ERROR;
2426         }
2427         *v = data_blob_string_const(dnstring);
2428
2429         return LDB_SUCCESS;
2430 }
2431
2432 /**
2433  * Updates the value for a linked attribute, including all meta data fields
2434  */
2435 static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
2436                                 struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
2437                                 uint64_t usn, uint64_t local_usn, NTTIME nttime,
2438                                 bool deleted)
2439 {
2440         uint32_t old_version;
2441         uint32_t version = RMD_VERSION_INITIAL;
2442         NTSTATUS status;
2443
2444         /*
2445          * We're updating the linked attribute locally, so increase the version
2446          * by 1 so that other DCs will see the change when it gets replicated out
2447          */
2448         status = dsdb_get_extended_dn_uint32(old_dsdb_dn->dn, &old_version,
2449                                              "RMD_VERSION");
2450
2451         if (NT_STATUS_IS_OK(status)) {
2452                 version = old_version + 1;
2453         }
2454
2455         return replmd_set_la_val(mem_ctx, v, dsdb_dn, old_dsdb_dn, invocation_id,
2456                                  usn, local_usn, nttime, version, deleted);
2457 }
2458
2459 /*
2460   handle adding a linked attribute
2461  */
2462 static int replmd_modify_la_add(struct ldb_module *module,
2463                                 struct replmd_private *replmd_private,
2464                                 struct replmd_replicated_request *ac,
2465                                 struct ldb_message *msg,
2466                                 struct ldb_message_element *el,
2467                                 struct ldb_message_element *old_el,
2468                                 const struct dsdb_attribute *schema_attr,
2469                                 time_t t,
2470                                 struct ldb_dn *msg_dn,
2471                                 struct ldb_request *parent)
2472 {
2473         unsigned int i, j;
2474         struct parsed_dn *dns, *old_dns;
2475         TALLOC_CTX *tmp_ctx = talloc_new(msg);
2476         int ret;
2477         struct ldb_val *new_values = NULL;
2478         unsigned old_num_values = old_el ? old_el->num_values : 0;
2479         unsigned num_values = 0;
2480         unsigned max_num_values;
2481         struct ldb_context *ldb = ldb_module_get_ctx(module);
2482         NTTIME now;
2483         unix_to_nt_time(&now, t);
2484
2485         /* get the DNs to be added, fully parsed.
2486          *
2487          * We need full parsing because they came off the wire and we don't
2488          * trust them, besides which we need their details to know where to put
2489          * them.
2490          */
2491         ret = get_parsed_dns(module, tmp_ctx, el, &dns,
2492                              schema_attr->syntax->ldap_oid, parent);
2493         if (ret != LDB_SUCCESS) {
2494                 talloc_free(tmp_ctx);
2495                 return ret;
2496         }
2497
2498         /* get the existing DNs, lazily parsed */
2499         ret = get_parsed_dns_trusted(module, replmd_private,
2500                                      tmp_ctx, old_el, &old_dns,
2501                                      schema_attr->syntax->ldap_oid, parent);
2502
2503         if (ret != LDB_SUCCESS) {
2504                 talloc_free(tmp_ctx);
2505                 return ret;
2506         }
2507
2508         max_num_values = old_num_values + el->num_values;
2509         if (max_num_values < old_num_values) {
2510                 DEBUG(0, ("we seem to have overflow in replmd_modify_la_add. "
2511                           "old values: %u, new values: %u, sum: %u\n",
2512                           old_num_values, el->num_values, max_num_values));
2513                 talloc_free(tmp_ctx);
2514                 return LDB_ERR_OPERATIONS_ERROR;
2515         }
2516
2517         new_values = talloc_zero_array(tmp_ctx, struct ldb_val, max_num_values);
2518
2519         if (new_values == NULL) {
2520                 ldb_module_oom(module);
2521                 talloc_free(tmp_ctx);
2522                 return LDB_ERR_OPERATIONS_ERROR;
2523         }
2524
2525         /*
2526          * For each new value, find where it would go in the list. If there is
2527          * a matching GUID there, we update the existing value; otherwise we
2528          * put it in place.
2529          */
2530         j = 0;
2531         for (i = 0; i < el->num_values; i++) {
2532                 struct parsed_dn *exact;
2533                 struct parsed_dn *next;
2534                 unsigned offset;
2535                 int err = parsed_dn_find(ldb, old_dns, old_num_values,
2536                                          &dns[i].guid,
2537                                          dns[i].dsdb_dn->dn,
2538                                          dns[i].dsdb_dn->extra_part, 0,
2539                                          &exact, &next,
2540                                          schema_attr->syntax->ldap_oid,
2541                                          true);
2542                 if (err != LDB_SUCCESS) {
2543                         talloc_free(tmp_ctx);
2544                         return err;
2545                 }
2546
2547                 if (ac->fix_link_sid) {
2548                         char *fixed_dnstring = NULL;
2549                         struct dom_sid tmp_sid = { 0, };
2550                         DATA_BLOB sid_blob = data_blob_null;
2551                         enum ndr_err_code ndr_err;
2552                         NTSTATUS status;
2553                         int num;
2554
2555                         if (exact == NULL) {
2556                                 talloc_free(tmp_ctx);
2557                                 return ldb_operr(ldb);
2558                         }
2559
2560                         if (dns[i].dsdb_dn->dn_format != DSDB_NORMAL_DN) {
2561                                 talloc_free(tmp_ctx);
2562                                 return ldb_operr(ldb);
2563                         }
2564
2565                         /*
2566                          * Only "<GUID=...><SID=...>" is allowed.
2567                          *
2568                          * We get the GUID to just to find the old
2569                          * value and the SID in order to add it
2570                          * to the found value.
2571                          */
2572
2573                         num = ldb_dn_get_comp_num(dns[i].dsdb_dn->dn);
2574                         if (num != 0) {
2575                                 talloc_free(tmp_ctx);
2576                                 return ldb_operr(ldb);
2577                         }
2578
2579                         num = ldb_dn_get_extended_comp_num(dns[i].dsdb_dn->dn);
2580                         if (num != 2) {
2581                                 talloc_free(tmp_ctx);
2582                                 return ldb_operr(ldb);
2583                         }
2584
2585                         status = dsdb_get_extended_dn_sid(exact->dsdb_dn->dn,
2586                                                           &tmp_sid, "SID");
2587                         if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
2588                                 /* this is what we expect */
2589                         } else if (NT_STATUS_IS_OK(status)) {
2590                                 struct GUID_txt_buf guid_str;
2591                                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
2592                                                        "i[%u] SID NOT MISSING... Attribute %s already "
2593                                                        "exists for target GUID %s, SID %s, DN: %s",
2594                                                        i, el->name,
2595                                                        GUID_buf_string(&exact->guid,
2596                                                                        &guid_str),
2597                                                        dom_sid_string(tmp_ctx, &tmp_sid),
2598                                                        dsdb_dn_get_extended_linearized(tmp_ctx,
2599                                                                exact->dsdb_dn, 1));
2600                                 talloc_free(tmp_ctx);
2601                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
2602                         } else {
2603                                 talloc_free(tmp_ctx);
2604                                 return ldb_operr(ldb);
2605                         }
2606
2607                         status = dsdb_get_extended_dn_sid(dns[i].dsdb_dn->dn,
2608                                                           &tmp_sid, "SID");
2609                         if (!NT_STATUS_IS_OK(status)) {
2610                                 struct GUID_txt_buf guid_str;
2611                                 ldb_asprintf_errstring(ldb,
2612                                                        "NO SID PROVIDED... Attribute %s already "
2613                                                        "exists for target GUID %s",
2614                                                        el->name,
2615                                                        GUID_buf_string(&exact->guid,
2616                                                                        &guid_str));
2617                                 talloc_free(tmp_ctx);
2618                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
2619                         }
2620
2621                         ndr_err = ndr_push_struct_blob(&sid_blob, tmp_ctx, &tmp_sid,
2622                                                        (ndr_push_flags_fn_t)ndr_push_dom_sid);
2623                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
2624                                 talloc_free(tmp_ctx);
2625                                 return ldb_operr(ldb);
2626                         }
2627
2628                         ret = ldb_dn_set_extended_component(exact->dsdb_dn->dn, "SID", &sid_blob);
2629                         data_blob_free(&sid_blob);
2630                         if (ret != LDB_SUCCESS) {
2631                                 talloc_free(tmp_ctx);
2632                                 return ret;
2633                         }
2634
2635                         fixed_dnstring = dsdb_dn_get_extended_linearized(
2636                                         new_values, exact->dsdb_dn, 1);
2637                         if (fixed_dnstring == NULL) {
2638                                 talloc_free(tmp_ctx);
2639                                 return ldb_operr(ldb);
2640                         }
2641
2642                         /*
2643                          * We just replace the existing value...
2644                          */
2645                         *exact->v = data_blob_string_const(fixed_dnstring);
2646
2647                         continue;
2648                 }
2649
2650                 if (exact != NULL) {
2651                         /*
2652                          * We are trying to add one that exists, which is only
2653                          * allowed if it was previously deleted.
2654                          *
2655                          * When we do undelete a link we change it in place.
2656                          * It will be copied across into the right spot in due
2657                          * course.
2658                          */
2659                         uint32_t rmd_flags;
2660                         rmd_flags = dsdb_dn_rmd_flags(exact->dsdb_dn->dn);
2661
2662                         if (!(rmd_flags & DSDB_RMD_FLAG_DELETED)) {
2663                                 struct GUID_txt_buf guid_str;
2664                                 ldb_asprintf_errstring(ldb,
2665                                                        "Attribute %s already "
2666                                                        "exists for target GUID %s",
2667                                                        el->name,
2668                                                        GUID_buf_string(&exact->guid,
2669                                                                        &guid_str));
2670                                 talloc_free(tmp_ctx);
2671                                 /* error codes for 'member' need to be
2672                                    special cased */
2673                                 if (ldb_attr_cmp(el->name, "member") == 0) {
2674                                         return LDB_ERR_ENTRY_ALREADY_EXISTS;
2675                                 } else {
2676                                         return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
2677                                 }
2678                         }
2679
2680                         ret = replmd_update_la_val(new_values, exact->v,
2681                                                    dns[i].dsdb_dn,
2682                                                    exact->dsdb_dn,
2683                                                    &ac->our_invocation_id,
2684                                                    ac->seq_num, ac->seq_num,
2685                                                    now, false);
2686                         if (ret != LDB_SUCCESS) {
2687                                 talloc_free(tmp_ctx);
2688                                 return ret;
2689                         }
2690
2691                         ret = replmd_add_backlink(module, replmd_private,
2692                                                   ac->schema,
2693                                                   msg_dn,
2694                                                   &dns[i].guid, 
2695                                                   true,
2696                                                   schema_attr,
2697                                                   parent);
2698                         if (ret != LDB_SUCCESS) {
2699                                 talloc_free(tmp_ctx);
2700                                 return ret;
2701                                 }
2702                         continue;
2703                 }
2704                 /*
2705                  * Here we don't have an exact match.
2706                  *
2707                  * If next is NULL, this one goes beyond the end of the
2708                  * existing list, so we need to add all of those ones first.
2709                  *
2710                  * If next is not NULL, we need to add all the ones before
2711                  * next.
2712                  */
2713                 if (next == NULL) {
2714                         offset = old_num_values;
2715                 } else {
2716                         /* next should have been parsed, but let's make sure */
2717                         if (next->dsdb_dn == NULL) {
2718                                 ret = really_parse_trusted_dn(tmp_ctx, ldb, next,
2719                                                               schema_attr->syntax->ldap_oid);
2720                                 if (ret != LDB_SUCCESS) {
2721                                         return ret;
2722                                 }
2723                         }
2724                         offset = MIN(next - old_dns, old_num_values);
2725                 }
2726
2727                 /* put all the old ones before next on the list */
2728                 for (; j < offset; j++) {
2729                         new_values[num_values] = *old_dns[j].v;
2730                         num_values++;
2731                 }
2732
2733                 ret = replmd_add_backlink(module, replmd_private,
2734                                           ac->schema, msg_dn,
2735                                           &dns[i].guid,
2736                                           true, schema_attr,
2737                                           parent);
2738                 /* Make the new linked attribute ldb_val. */
2739                 ret = replmd_build_la_val(new_values, &new_values[num_values],
2740                                           dns[i].dsdb_dn, &ac->our_invocation_id,
2741                                           ac->seq_num, now);
2742                 if (ret != LDB_SUCCESS) {
2743                         talloc_free(tmp_ctx);
2744                         return ret;
2745                 }
2746                 num_values++;
2747                 if (ret != LDB_SUCCESS) {
2748                         talloc_free(tmp_ctx);
2749                         return ret;
2750                 }
2751         }
2752         /* copy the rest of the old ones (if any) */
2753         for (; j < old_num_values; j++) {
2754                 new_values[num_values] = *old_dns[j].v;
2755                 num_values++;
2756         }
2757
2758         talloc_steal(msg->elements, new_values);
2759         if (old_el != NULL) {
2760                 talloc_steal(msg->elements, old_el->values);
2761         }
2762         el->values = new_values;
2763         el->num_values = num_values;
2764
2765         talloc_free(tmp_ctx);
2766
2767         /* we now tell the backend to replace all existing values
2768            with the one we have constructed */
2769         el->flags = LDB_FLAG_MOD_REPLACE;
2770
2771         return LDB_SUCCESS;
2772 }
2773
2774
2775 /*
2776   handle deleting all active linked attributes
2777  */
2778 static int replmd_modify_la_delete(struct ldb_module *module,
2779                                    struct replmd_private *replmd_private,
2780                                    struct replmd_replicated_request *ac,
2781                                    struct ldb_message *msg,
2782                                    struct ldb_message_element *el,
2783                                    struct ldb_message_element *old_el,
2784                                    const struct dsdb_attribute *schema_attr,
2785                                    time_t t,
2786                                    struct ldb_dn *msg_dn,
2787                                    struct ldb_request *parent)
2788 {
2789         unsigned int i;
2790         struct parsed_dn *dns, *old_dns;
2791         TALLOC_CTX *tmp_ctx = NULL;
2792         int ret;
2793         struct ldb_context *ldb = ldb_module_get_ctx(module);
2794         struct ldb_control *vanish_links_ctrl = NULL;
2795         bool vanish_links = false;
2796         unsigned int num_to_delete = el->num_values;
2797         uint32_t rmd_flags;
2798         NTTIME now;
2799
2800         unix_to_nt_time(&now, t);
2801
2802         if (old_el == NULL || old_el->num_values == 0) {
2803                 /* there is nothing to delete... */
2804                 if (num_to_delete == 0) {
2805                         /* and we're deleting nothing, so that's OK */
2806                         return LDB_SUCCESS;
2807                 }
2808                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
2809         }
2810
2811         tmp_ctx = talloc_new(msg);
2812         if (tmp_ctx == NULL) {
2813                 return LDB_ERR_OPERATIONS_ERROR;
2814         }
2815
2816         ret = get_parsed_dns(module, tmp_ctx, el, &dns,
2817                              schema_attr->syntax->ldap_oid, parent);
2818         if (ret != LDB_SUCCESS) {
2819                 talloc_free(tmp_ctx);
2820                 return ret;
2821         }
2822
2823         ret = get_parsed_dns_trusted(module, replmd_private,
2824                                      tmp_ctx, old_el, &old_dns,
2825                                      schema_attr->syntax->ldap_oid, parent);
2826
2827         if (ret != LDB_SUCCESS) {
2828                 talloc_free(tmp_ctx);
2829                 return ret;
2830         }
2831
2832         if (parent) {
2833                 vanish_links_ctrl = ldb_request_get_control(parent, DSDB_CONTROL_REPLMD_VANISH_LINKS);
2834                 if (vanish_links_ctrl) {
2835                         vanish_links = true;
2836                         vanish_links_ctrl->critical = false;
2837                 }
2838         }
2839
2840         /* we empty out el->values here to avoid damage if we return early. */
2841         el->num_values = 0;
2842         el->values = NULL;
2843
2844         /*
2845          * If vanish links is set, we are actually removing members of
2846          *  old_el->values; otherwise we are just marking them deleted.
2847          *
2848          * There is a special case when no values are given: we remove them
2849          * all. When we have the vanish_links control we just have to remove
2850          * the backlinks and change our element to replace the existing values
2851          * with the empty list.
2852          */
2853
2854         if (num_to_delete == 0) {
2855                 for (i = 0; i < old_el->num_values; i++) {
2856                         struct parsed_dn *p = &old_dns[i];
2857                         if (p->dsdb_dn == NULL) {
2858                                 ret = really_parse_trusted_dn(tmp_ctx, ldb, p,
2859                                                               schema_attr->syntax->ldap_oid);
2860                                 if (ret != LDB_SUCCESS) {
2861                                         return ret;
2862                                 }
2863                         }
2864                         ret = replmd_add_backlink(module, replmd_private,
2865                                                   ac->schema, msg_dn, &p->guid,
2866                                                   false, schema_attr,
2867                                                   parent);
2868                         if (ret != LDB_SUCCESS) {
2869                                 talloc_free(tmp_ctx);
2870                                 return ret;
2871                         }
2872                         if (vanish_links) {
2873                                 continue;
2874                         }
2875
2876                         rmd_flags = dsdb_dn_rmd_flags(p->dsdb_dn->dn);
2877                         if (rmd_flags & DSDB_RMD_FLAG_DELETED) {
2878                                 continue;
2879                         }
2880
2881                         ret = replmd_update_la_val(old_el->values, p->v,
2882                                                    p->dsdb_dn, p->dsdb_dn,
2883                                                    &ac->our_invocation_id,
2884                                                    ac->seq_num, ac->seq_num,
2885                                                    now, true);
2886                         if (ret != LDB_SUCCESS) {
2887                                 talloc_free(tmp_ctx);
2888                                 return ret;
2889                         }
2890                 }
2891
2892                 if (vanish_links) {
2893                         el->flags = LDB_FLAG_MOD_REPLACE;
2894                         talloc_free(tmp_ctx);
2895                         return LDB_SUCCESS;
2896                 }
2897         }
2898
2899
2900         for (i = 0; i < num_to_delete; i++) {
2901                 struct parsed_dn *p = &dns[i];
2902                 struct parsed_dn *exact = NULL;
2903                 struct parsed_dn *next = NULL;
2904                 ret = parsed_dn_find(ldb, old_dns, old_el->num_values,
2905                                      &p->guid,
2906                                      NULL,
2907                                      p->dsdb_dn->extra_part, 0,
2908                                      &exact, &next,
2909                                      schema_attr->syntax->ldap_oid,
2910                                      true);
2911                 if (ret != LDB_SUCCESS) {
2912                         talloc_free(tmp_ctx);
2913                         return ret;
2914                 }
2915                 if (exact == NULL) {
2916                         struct GUID_txt_buf buf;