s4-drs: fixed updating of uSNChanged in replmd_modify
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /*
24  *  Name: ldb
25  *
26  *  Component: ldb repl_meta_data module
27  *
28  *  Description: - add a unique objectGUID onto every new record,
29  *               - handle whenCreated, whenChanged timestamps
30  *               - handle uSNCreated, uSNChanged numbers
31  *               - handle replPropertyMetaData attribute
32  *
33  *  Author: Simo Sorce
34  *  Author: Stefan Metzmacher
35  */
36
37 #include "includes.h"
38 #include "ldb_module.h"
39 #include "dsdb/samdb/samdb.h"
40 #include "dsdb/common/proto.h"
41 #include "../libds/common/flags.h"
42 #include "librpc/gen_ndr/ndr_misc.h"
43 #include "librpc/gen_ndr/ndr_drsuapi.h"
44 #include "librpc/gen_ndr/ndr_drsblobs.h"
45 #include "param/param.h"
46 #include "libcli/security/dom_sid.h"
47 #include "lib/util/dlinklist.h"
48
49 struct replmd_private {
50         TALLOC_CTX *la_ctx;
51         struct la_entry *la_list;
52         struct nc_entry {
53                 struct nc_entry *prev, *next;
54                 struct ldb_dn *dn;
55                 uint64_t mod_usn;
56         } *ncs;
57 };
58
59 struct la_entry {
60         struct la_entry *next, *prev;
61         struct drsuapi_DsReplicaLinkedAttribute *la;
62 };
63
64 struct replmd_replicated_request {
65         struct ldb_module *module;
66         struct ldb_request *req;
67
68         const struct dsdb_schema *schema;
69
70         /* the controls we pass down */
71         struct ldb_control **controls;
72
73         /* details for the mode where we apply a bunch of inbound replication meessages */
74         bool apply_mode;
75         uint32_t index_current;
76         struct dsdb_extended_replicated_objects *objs;
77
78         struct ldb_message *search_msg;
79
80         uint64_t seq_num;
81
82 };
83
84 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
85
86 /*
87   initialise the module
88   allocate the private structure and build the list
89   of partition DNs for use by replmd_notify()
90  */
91 static int replmd_init(struct ldb_module *module)
92 {
93         struct replmd_private *replmd_private;
94         struct ldb_context *ldb = ldb_module_get_ctx(module);
95
96         replmd_private = talloc_zero(module, struct replmd_private);
97         if (replmd_private == NULL) {
98                 ldb_oom(ldb);
99                 return LDB_ERR_OPERATIONS_ERROR;
100         }
101         ldb_module_set_private(module, replmd_private);
102
103         return ldb_next_init(module);
104 }
105
106
107 /*
108  * Callback for most write operations in this module:
109  * 
110  * notify the repl task that a object has changed. The notifies are
111  * gathered up in the replmd_private structure then written to the
112  * @REPLCHANGED object in each partition during the prepare_commit
113  */
114 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
115 {
116         int ret;
117         struct replmd_replicated_request *ac = 
118                 talloc_get_type_abort(req->context, struct replmd_replicated_request);
119         struct replmd_private *replmd_private = 
120                 talloc_get_type_abort(ldb_module_get_private(ac->module), struct replmd_private);
121         struct nc_entry *modified_partition;
122         struct ldb_control *partition_ctrl;
123         const struct dsdb_control_current_partition *partition;
124
125         struct ldb_control **controls;
126
127         partition_ctrl = ldb_reply_get_control(ares, DSDB_CONTROL_CURRENT_PARTITION_OID);
128
129         /* Remove the 'partition' control from what we pass up the chain */
130         controls = controls_except_specified(ares->controls, ares, partition_ctrl);
131
132         if (ares->error != LDB_SUCCESS) {
133                 return ldb_module_done(ac->req, controls,
134                                         ares->response, ares->error);
135         }
136
137         if (ares->type != LDB_REPLY_DONE) {
138                 ldb_set_errstring(ldb_module_get_ctx(ac->module), "Invalid reply type for notify\n!");
139                 return ldb_module_done(ac->req, NULL,
140                                        NULL, LDB_ERR_OPERATIONS_ERROR);
141         }
142
143         if (!partition_ctrl) {
144                 return ldb_module_done(ac->req, NULL,
145                                        NULL, LDB_ERR_OPERATIONS_ERROR);
146         }
147
148         partition = talloc_get_type_abort(partition_ctrl->data,
149                                     struct dsdb_control_current_partition);
150         
151         if (ac->seq_num > 0) {
152                 for (modified_partition = replmd_private->ncs; modified_partition; 
153                      modified_partition = modified_partition->next) {
154                         if (ldb_dn_compare(modified_partition->dn, partition->dn) == 0) {
155                                 break;
156                         }
157                 }
158                 
159                 if (modified_partition == NULL) {
160                         modified_partition = talloc_zero(replmd_private, struct nc_entry);
161                         if (!modified_partition) {
162                                 ldb_oom(ldb_module_get_ctx(ac->module));
163                                 return ldb_module_done(ac->req, NULL,
164                                                        NULL, LDB_ERR_OPERATIONS_ERROR);
165                         }
166                         modified_partition->dn = ldb_dn_copy(modified_partition, partition->dn);
167                         if (!modified_partition->dn) {
168                                 ldb_oom(ldb_module_get_ctx(ac->module));
169                                 return ldb_module_done(ac->req, NULL,
170                                                        NULL, LDB_ERR_OPERATIONS_ERROR);
171                         }
172                         DLIST_ADD(replmd_private->ncs, modified_partition);
173                 }
174
175                 if (ac->seq_num > modified_partition->mod_usn) {
176                         modified_partition->mod_usn = ac->seq_num;
177                 }
178         }
179
180         if (ac->apply_mode) {
181                 talloc_free(ares);
182                 ac->index_current++;
183                 
184                 ret = replmd_replicated_apply_next(ac);
185                 if (ret != LDB_SUCCESS) {
186                         return ldb_module_done(ac->req, NULL, NULL, ret);
187                 }
188                 return ret;
189         } else {
190                 /* free the partition control container here, for the
191                  * common path.  Other cases will have it cleaned up
192                  * eventually with the ares */
193                 talloc_free(partition_ctrl);
194                 return ldb_module_done(ac->req, 
195                                        controls_except_specified(controls, ares, partition_ctrl),
196                                        ares->response, LDB_SUCCESS);
197         }
198 }
199
200
201 /*
202  * update a @REPLCHANGED record in each partition if there have been
203  * any writes of replicated data in the partition
204  */
205 static int replmd_notify_store(struct ldb_module *module)
206 {
207         struct replmd_private *replmd_private = 
208                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
209         struct ldb_context *ldb = ldb_module_get_ctx(module);
210
211         while (replmd_private->ncs) {
212                 int ret;
213                 struct nc_entry *modified_partition = replmd_private->ncs;
214
215                 ret = dsdb_save_partition_usn(ldb, modified_partition->dn, modified_partition->mod_usn);
216                 if (ret != LDB_SUCCESS) {
217                         DEBUG(0,(__location__ ": Failed to save partition uSN for %s\n",
218                                  ldb_dn_get_linearized(modified_partition->dn)));
219                         return ret;
220                 }
221                 DLIST_REMOVE(replmd_private->ncs, modified_partition);
222                 talloc_free(modified_partition);
223         }
224
225         return LDB_SUCCESS;
226 }
227
228
229 /*
230   created a replmd_replicated_request context
231  */
232 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
233                                                          struct ldb_request *req)
234 {
235         struct ldb_context *ldb;
236         struct replmd_replicated_request *ac;
237
238         ldb = ldb_module_get_ctx(module);
239
240         ac = talloc_zero(req, struct replmd_replicated_request);
241         if (ac == NULL) {
242                 ldb_oom(ldb);
243                 return NULL;
244         }
245
246         ac->module = module;
247         ac->req = req;
248
249         ac->schema = dsdb_get_schema(ldb);
250         if (!ac->schema) {
251                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
252                               "replmd_modify: no dsdb_schema loaded");
253                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
254                 return NULL;
255         }
256
257         return ac;
258 }
259
260 /*
261   add a time element to a record
262 */
263 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
264 {
265         struct ldb_message_element *el;
266         char *s;
267
268         if (ldb_msg_find_element(msg, attr) != NULL) {
269                 return LDB_SUCCESS;
270         }
271
272         s = ldb_timestring(msg, t);
273         if (s == NULL) {
274                 return LDB_ERR_OPERATIONS_ERROR;
275         }
276
277         if (ldb_msg_add_string(msg, attr, s) != LDB_SUCCESS) {
278                 return LDB_ERR_OPERATIONS_ERROR;
279         }
280
281         el = ldb_msg_find_element(msg, attr);
282         /* always set as replace. This works because on add ops, the flag
283            is ignored */
284         el->flags = LDB_FLAG_MOD_REPLACE;
285
286         return LDB_SUCCESS;
287 }
288
289 /*
290   add a uint64_t element to a record
291 */
292 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
293 {
294         struct ldb_message_element *el;
295
296         if (ldb_msg_find_element(msg, attr) != NULL) {
297                 return LDB_SUCCESS;
298         }
299
300         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != LDB_SUCCESS) {
301                 return LDB_ERR_OPERATIONS_ERROR;
302         }
303
304         el = ldb_msg_find_element(msg, attr);
305         /* always set as replace. This works because on add ops, the flag
306            is ignored */
307         el->flags = LDB_FLAG_MOD_REPLACE;
308
309         return LDB_SUCCESS;
310 }
311
312 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
313                                                    const struct replPropertyMetaData1 *m2,
314                                                    const uint32_t *rdn_attid)
315 {
316         if (m1->attid == m2->attid) {
317                 return 0;
318         }
319
320         /*
321          * the rdn attribute should be at the end!
322          * so we need to return a value greater than zero
323          * which means m1 is greater than m2
324          */
325         if (m1->attid == *rdn_attid) {
326                 return 1;
327         }
328
329         /*
330          * the rdn attribute should be at the end!
331          * so we need to return a value less than zero
332          * which means m2 is greater than m1
333          */
334         if (m2->attid == *rdn_attid) {
335                 return -1;
336         }
337
338         return m1->attid - m2->attid;
339 }
340
341 static int replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
342                                                 const struct dsdb_schema *schema,
343                                                 struct ldb_dn *dn)
344 {
345         const char *rdn_name;
346         const struct dsdb_attribute *rdn_sa;
347
348         rdn_name = ldb_dn_get_rdn_name(dn);
349         if (!rdn_name) {
350                 DEBUG(0,(__location__ ": No rDN for %s?\n", ldb_dn_get_linearized(dn)));
351                 return LDB_ERR_OPERATIONS_ERROR;
352         }
353
354         rdn_sa = dsdb_attribute_by_lDAPDisplayName(schema, rdn_name);
355         if (rdn_sa == NULL) {
356                 DEBUG(0,(__location__ ": No sa found for rDN %s for %s\n", rdn_name, ldb_dn_get_linearized(dn)));
357                 return LDB_ERR_OPERATIONS_ERROR;                
358         }
359
360         DEBUG(6,("Sorting rpmd with attid exception %u rDN=%s DN=%s\n", 
361                  rdn_sa->attributeID_id, rdn_name, ldb_dn_get_linearized(dn)));
362
363         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
364                   discard_const_p(void, &rdn_sa->attributeID_id), 
365                   (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
366
367         return LDB_SUCCESS;
368 }
369
370 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
371                                                  const struct ldb_message_element *e2,
372                                                  const struct dsdb_schema *schema)
373 {
374         const struct dsdb_attribute *a1;
375         const struct dsdb_attribute *a2;
376
377         /* 
378          * TODO: make this faster by caching the dsdb_attribute pointer
379          *       on the ldb_messag_element
380          */
381
382         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
383         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
384
385         /*
386          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
387          *       in the schema
388          */
389         if (!a1 || !a2) {
390                 return strcasecmp(e1->name, e2->name);
391         }
392
393         return a1->attributeID_id - a2->attributeID_id;
394 }
395
396 static void replmd_ldb_message_sort(struct ldb_message *msg,
397                                     const struct dsdb_schema *schema)
398 {
399         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
400                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
401 }
402
403 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
404 {
405         struct ldb_context *ldb;
406         struct ldb_control *control;
407         struct ldb_control **saved_controls;
408         struct replmd_replicated_request *ac;
409         enum ndr_err_code ndr_err;
410         struct ldb_request *down_req;
411         struct ldb_message *msg;
412         const DATA_BLOB *guid_blob;
413         struct GUID guid;
414         struct replPropertyMetaDataBlob nmd;
415         struct ldb_val nmd_value;
416         const struct GUID *our_invocation_id;
417         time_t t = time(NULL);
418         NTTIME now;
419         char *time_str;
420         int ret;
421         uint32_t i, ni=0;
422         bool allow_add_guid = false;
423         bool remove_current_guid = false;
424
425         /* check if there's a show relax control (used by provision to say 'I know what I'm doing') */
426         control = ldb_request_get_control(req, LDB_CONTROL_RELAX_OID);
427         if (control) {
428                 allow_add_guid = 1;
429         }
430
431         /* do not manipulate our control entries */
432         if (ldb_dn_is_special(req->op.add.message->dn)) {
433                 return ldb_next_request(module, req);
434         }
435
436         ldb = ldb_module_get_ctx(module);
437
438         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
439
440         ac = replmd_ctx_init(module, req);
441         if (!ac) {
442                 return LDB_ERR_OPERATIONS_ERROR;
443         }
444
445         guid_blob = ldb_msg_find_ldb_val(req->op.add.message, "objectGUID");
446         if ( guid_blob != NULL ) {
447                 if( !allow_add_guid ) {
448                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
449                               "replmd_add: it's not allowed to add an object with objectGUID\n");
450                         talloc_free(ac);
451                         return LDB_ERR_UNWILLING_TO_PERFORM;
452                 } else {
453                         NTSTATUS status = GUID_from_data_blob(guid_blob,&guid);
454                         if ( !NT_STATUS_IS_OK(status)) {
455                                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
456                                       "replmd_add: Unable to parse as a GUID the attribute objectGUID\n");
457                                 talloc_free(ac);
458                                 return LDB_ERR_UNWILLING_TO_PERFORM;
459                         }
460                         /* we remove this attribute as it can be a string and will not be treated 
461                         correctly and then we will readd it latter on in the good format*/
462                         remove_current_guid = true;
463                 }
464         } else {
465                 /* a new GUID */
466                 guid = GUID_random();
467         }
468
469         /* Get a sequence number from the backend */
470         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ac->seq_num);
471         if (ret != LDB_SUCCESS) {
472                 talloc_free(ac);
473                 return ret;
474         }
475
476         /* get our invocationId */
477         our_invocation_id = samdb_ntds_invocation_id(ldb);
478         if (!our_invocation_id) {
479                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
480                               "replmd_add: unable to find invocationId\n");
481                 talloc_free(ac);
482                 return LDB_ERR_OPERATIONS_ERROR;
483         }
484
485         /* we have to copy the message as the caller might have it as a const */
486         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
487         if (msg == NULL) {
488                 ldb_oom(ldb);
489                 talloc_free(ac);
490                 return LDB_ERR_OPERATIONS_ERROR;
491         }
492
493         /* generated times */
494         unix_to_nt_time(&now, t);
495         time_str = ldb_timestring(msg, t);
496         if (!time_str) {
497                 ldb_oom(ldb);
498                 talloc_free(ac);
499                 return LDB_ERR_OPERATIONS_ERROR;
500         }
501         if (remove_current_guid) {
502                 ldb_msg_remove_attr(msg,"objectGUID");
503         }
504
505         /* 
506          * remove autogenerated attributes
507          */
508         ldb_msg_remove_attr(msg, "whenCreated");
509         ldb_msg_remove_attr(msg, "whenChanged");
510         ldb_msg_remove_attr(msg, "uSNCreated");
511         ldb_msg_remove_attr(msg, "uSNChanged");
512         ldb_msg_remove_attr(msg, "replPropertyMetaData");
513
514         /*
515          * readd replicated attributes
516          */
517         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
518         if (ret != LDB_SUCCESS) {
519                 ldb_oom(ldb);
520                 talloc_free(ac);
521                 return ret;
522         }
523
524         /* build the replication meta_data */
525         ZERO_STRUCT(nmd);
526         nmd.version             = 1;
527         nmd.ctr.ctr1.count      = msg->num_elements;
528         nmd.ctr.ctr1.array      = talloc_array(msg,
529                                                struct replPropertyMetaData1,
530                                                nmd.ctr.ctr1.count);
531         if (!nmd.ctr.ctr1.array) {
532                 ldb_oom(ldb);
533                 talloc_free(ac);
534                 return LDB_ERR_OPERATIONS_ERROR;
535         }
536
537         for (i=0; i < msg->num_elements; i++) {
538                 struct ldb_message_element *e = &msg->elements[i];
539                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
540                 const struct dsdb_attribute *sa;
541
542                 if (e->name[0] == '@') continue;
543
544                 sa = dsdb_attribute_by_lDAPDisplayName(ac->schema, e->name);
545                 if (!sa) {
546                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
547                                       "replmd_add: attribute '%s' not defined in schema\n",
548                                       e->name);
549                         talloc_free(ac);
550                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
551                 }
552
553                 if ((sa->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (sa->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
554                         /* if the attribute is not replicated (0x00000001)
555                          * or constructed (0x00000004) it has no metadata
556                          */
557                         continue;
558                 }
559
560                 m->attid                        = sa->attributeID_id;
561                 m->version                      = 1;
562                 m->originating_change_time      = now;
563                 m->originating_invocation_id    = *our_invocation_id;
564                 m->originating_usn              = ac->seq_num;
565                 m->local_usn                    = ac->seq_num;
566                 ni++;
567         }
568
569         /* fix meta data count */
570         nmd.ctr.ctr1.count = ni;
571
572         /*
573          * sort meta data array, and move the rdn attribute entry to the end
574          */
575         ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, ac->schema, msg->dn);
576         if (ret != LDB_SUCCESS) {
577                 talloc_free(ac);
578                 return ret;
579         }
580
581         /* generated NDR encoded values */
582         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
583                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
584                                        &nmd,
585                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
586         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
587                 ldb_oom(ldb);
588                 talloc_free(ac);
589                 return LDB_ERR_OPERATIONS_ERROR;
590         }
591
592         /*
593          * add the autogenerated values
594          */
595         ret = dsdb_msg_add_guid(msg, &guid, "objectGUID");
596         if (ret != LDB_SUCCESS) {
597                 ldb_oom(ldb);
598                 talloc_free(ac);
599                 return ret;
600         }
601         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
602         if (ret != LDB_SUCCESS) {
603                 ldb_oom(ldb);
604                 talloc_free(ac);
605                 return ret;
606         }
607         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", ac->seq_num);
608         if (ret != LDB_SUCCESS) {
609                 ldb_oom(ldb);
610                 talloc_free(ac);
611                 return ret;
612         }
613         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ac->seq_num);
614         if (ret != LDB_SUCCESS) {
615                 ldb_oom(ldb);
616                 talloc_free(ac);
617                 return ret;
618         }
619         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
620         if (ret != LDB_SUCCESS) {
621                 ldb_oom(ldb);
622                 talloc_free(ac);
623                 return ret;
624         }
625
626         /*
627          * sort the attributes by attid before storing the object
628          */
629         replmd_ldb_message_sort(msg, ac->schema);
630
631         ret = ldb_build_add_req(&down_req, ldb, ac,
632                                 msg,
633                                 req->controls,
634                                 ac, replmd_op_callback,
635                                 req);
636         if (ret != LDB_SUCCESS) {
637                 talloc_free(ac);
638                 return ret;
639         }
640
641         /* if a control is there remove if from the modified request */
642         if (control && !save_controls(control, down_req, &saved_controls)) {
643                 talloc_free(ac);
644                 return LDB_ERR_OPERATIONS_ERROR;
645         }
646
647         /* go on with the call chain */
648         return ldb_next_request(module, down_req);
649 }
650
651
652 /*
653  * update the replPropertyMetaData for one element
654  */
655 static int replmd_update_rpmd_element(struct ldb_context *ldb, 
656                                       struct ldb_message *msg,
657                                       struct ldb_message_element *el,
658                                       struct replPropertyMetaDataBlob *omd,
659                                       const struct dsdb_schema *schema,
660                                       uint64_t *seq_num,
661                                       const struct GUID *our_invocation_id,
662                                       NTTIME now)
663 {
664         int i;
665         const struct dsdb_attribute *a;
666         struct replPropertyMetaData1 *md1;
667
668         a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
669         if (a == NULL) {
670                 DEBUG(0,(__location__ ": Unable to find attribute %s in schema\n",
671                          el->name));
672                 return LDB_ERR_OPERATIONS_ERROR;
673         }
674
675         if ((a->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (a->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
676                 return LDB_SUCCESS;
677         }
678
679         for (i=0; i<omd->ctr.ctr1.count; i++) {
680                 if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) break;
681         }
682         if (i == omd->ctr.ctr1.count) {
683                 /* we need to add a new one */
684                 omd->ctr.ctr1.array = talloc_realloc(msg, omd->ctr.ctr1.array, 
685                                                      struct replPropertyMetaData1, omd->ctr.ctr1.count+1);
686                 if (omd->ctr.ctr1.array == NULL) {
687                         ldb_oom(ldb);
688                         return LDB_ERR_OPERATIONS_ERROR;
689                 }
690                 omd->ctr.ctr1.count++;
691                 ZERO_STRUCT(omd->ctr.ctr1.array[i]);
692         }
693
694         /* Get a new sequence number from the backend. We only do this
695          * if we have a change that requires a new
696          * replPropertyMetaData element 
697          */
698         if (*seq_num == 0) {
699                 int ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num);
700                 if (ret != LDB_SUCCESS) {
701                         return LDB_ERR_OPERATIONS_ERROR;
702                 }
703         }
704
705         md1 = &omd->ctr.ctr1.array[i];
706         md1->version++;
707         md1->attid                     = a->attributeID_id;
708         md1->originating_change_time   = now;
709         md1->originating_invocation_id = *our_invocation_id;
710         md1->originating_usn           = *seq_num;
711         md1->local_usn                 = *seq_num;
712         
713         return LDB_SUCCESS;
714 }
715
716 /*
717  * update the replPropertyMetaData object each time we modify an
718  * object. This is needed for DRS replication, as the merge on the
719  * client is based on this object 
720  */
721 static int replmd_update_rpmd(struct ldb_module *module, 
722                               const struct dsdb_schema *schema, 
723                               struct ldb_message *msg, uint64_t *seq_num)
724 {
725         const struct ldb_val *omd_value;
726         enum ndr_err_code ndr_err;
727         struct replPropertyMetaDataBlob omd;
728         int i;
729         time_t t = time(NULL);
730         NTTIME now;
731         const struct GUID *our_invocation_id;
732         int ret;
733         const char *attrs[] = { "replPropertyMetaData" , NULL };
734         struct ldb_result *res;
735         struct ldb_context *ldb;
736
737         ldb = ldb_module_get_ctx(module);
738
739         our_invocation_id = samdb_ntds_invocation_id(ldb);
740         if (!our_invocation_id) {
741                 /* this happens during an initial vampire while
742                    updating the schema */
743                 DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
744                 return LDB_SUCCESS;
745         }
746
747         unix_to_nt_time(&now, t);
748
749         /* search for the existing replPropertyMetaDataBlob */
750         ret = dsdb_search_dn_with_deleted(ldb, msg, &res, msg->dn, attrs);
751         if (ret != LDB_SUCCESS || res->count != 1) {
752                 DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
753                          ldb_dn_get_linearized(msg->dn)));
754                 return LDB_ERR_OPERATIONS_ERROR;
755         }
756                 
757
758         omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
759         if (!omd_value) {
760                 DEBUG(0,(__location__ ": Object %s does not have a replPropertyMetaData attribute\n",
761                          ldb_dn_get_linearized(msg->dn)));
762                 return LDB_ERR_OPERATIONS_ERROR;
763         }
764
765         ndr_err = ndr_pull_struct_blob(omd_value, msg,
766                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
767                                        (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
768         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
769                 DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
770                          ldb_dn_get_linearized(msg->dn)));
771                 return LDB_ERR_OPERATIONS_ERROR;
772         }
773
774         if (omd.version != 1) {
775                 DEBUG(0,(__location__ ": bad version %u in replPropertyMetaData for %s\n",
776                          omd.version, ldb_dn_get_linearized(msg->dn)));
777                 return LDB_ERR_OPERATIONS_ERROR;
778         }
779
780         for (i=0; i<msg->num_elements; i++) {
781                 ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], &omd, schema, seq_num, 
782                                                  our_invocation_id, now);
783                 if (ret != LDB_SUCCESS) {
784                         return ret;
785                 }
786         }
787
788         /*
789          * replmd_update_rpmd_element has done an update if the
790          * seq_num is set
791          */
792         if (*seq_num != 0) {
793                 struct ldb_val *md_value;
794                 struct ldb_message_element *el;
795
796                 md_value = talloc(msg, struct ldb_val);
797                 if (md_value == NULL) {
798                         ldb_oom(ldb);
799                         return LDB_ERR_OPERATIONS_ERROR;
800                 }
801
802                 ret = replmd_replPropertyMetaDataCtr1_sort(&omd.ctr.ctr1, schema, msg->dn);
803                 if (ret != LDB_SUCCESS) {
804                         return ret;
805                 }
806
807                 ndr_err = ndr_push_struct_blob(md_value, msg, 
808                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
809                                                &omd,
810                                                (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
811                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
812                         DEBUG(0,(__location__ ": Failed to marshall replPropertyMetaData for %s\n",
813                                  ldb_dn_get_linearized(msg->dn)));
814                         return LDB_ERR_OPERATIONS_ERROR;
815                 }
816
817                 ret = ldb_msg_add_empty(msg, "replPropertyMetaData", LDB_FLAG_MOD_REPLACE, &el);
818                 if (ret != LDB_SUCCESS) {
819                         DEBUG(0,(__location__ ": Failed to add updated replPropertyMetaData %s\n",
820                                  ldb_dn_get_linearized(msg->dn)));
821                         return ret;
822                 }
823
824                 el->num_values = 1;
825                 el->values = md_value;
826         }
827
828         return LDB_SUCCESS;     
829 }
830
831
832 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
833 {
834         struct ldb_context *ldb;
835         struct replmd_replicated_request *ac;
836         struct ldb_request *down_req;
837         struct ldb_message *msg;
838         struct ldb_result *res;
839         time_t t = time(NULL);
840         int ret;
841
842         /* do not manipulate our control entries */
843         if (ldb_dn_is_special(req->op.mod.message->dn)) {
844                 return ldb_next_request(module, req);
845         }
846
847         ldb = ldb_module_get_ctx(module);
848
849         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
850
851         ac = replmd_ctx_init(module, req);
852         if (!ac) {
853                 return LDB_ERR_OPERATIONS_ERROR;
854         }
855
856         /* we have to copy the message as the caller might have it as a const */
857         msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
858         if (msg == NULL) {
859                 ldb_oom(ldb);
860                 talloc_free(ac);
861                 return LDB_ERR_OPERATIONS_ERROR;
862         }
863
864         /* TODO:
865          * - give an error when a readonly attribute should
866          *   be modified
867          * - merge the changed into the old object
868          *   if the caller set values to the same value
869          *   ignore the attribute, return success when no
870          *   attribute was changed
871          */
872
873         ret = dsdb_search_dn_with_deleted(ldb, msg, &res, msg->dn, NULL);
874         if (ret != LDB_SUCCESS) {
875                 talloc_free(ac);
876                 return ret;
877         }
878
879         ret = replmd_update_rpmd(module, ac->schema, msg, &ac->seq_num);
880         if (ret != LDB_SUCCESS) {
881                 talloc_free(ac);
882                 return ret;
883         }
884
885         /* TODO:
886          * - replace the old object with the newly constructed one
887          */
888
889         ret = ldb_build_mod_req(&down_req, ldb, ac,
890                                 msg,
891                                 req->controls,
892                                 ac, replmd_op_callback,
893                                 req);
894         if (ret != LDB_SUCCESS) {
895                 talloc_free(ac);
896                 return ret;
897         }
898         talloc_steal(down_req, msg);
899
900         /* we only change whenChanged and uSNChanged if the seq_num
901            has changed */
902         if (ac->seq_num != 0) {
903                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
904                         talloc_free(ac);
905                         return ret;
906                 }
907
908                 if (add_uint64_element(msg, "uSNChanged", ac->seq_num) != LDB_SUCCESS) {
909                         talloc_free(ac);
910                         return ret;
911                 }
912         }
913
914         /* go on with the call chain */
915         return ldb_next_request(module, down_req);
916 }
917
918 static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *ares);
919
920 /*
921   handle a rename request
922
923   On a rename we need to do an extra ldb_modify which sets the
924   whenChanged and uSNChanged attributes.  We do this in a callback after the success.
925  */
926 static int replmd_rename(struct ldb_module *module, struct ldb_request *req)
927 {
928         struct ldb_context *ldb;
929         struct replmd_replicated_request *ac;
930         int ret;
931         struct ldb_request *down_req;
932
933         /* do not manipulate our control entries */
934         if (ldb_dn_is_special(req->op.mod.message->dn)) {
935                 return ldb_next_request(module, req);
936         }
937
938         ldb = ldb_module_get_ctx(module);
939
940         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_rename\n");
941
942         ac = replmd_ctx_init(module, req);
943         if (!ac) {
944                 return LDB_ERR_OPERATIONS_ERROR;
945         }
946         ret = ldb_build_rename_req(&down_req, ldb, ac,
947                                    ac->req->op.rename.olddn,
948                                    ac->req->op.rename.newdn,
949                                    ac->req->controls,
950                                    ac, replmd_rename_callback,
951                                    ac->req);
952
953         if (ret != LDB_SUCCESS) {
954                 talloc_free(ac);
955                 return ret;
956         }
957
958         /* go on with the call chain */
959         return ldb_next_request(module, down_req);
960 }
961
962 /* After the rename is compleated, update the whenchanged etc */
963 static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *ares)
964 {
965         struct ldb_context *ldb;
966         struct replmd_replicated_request *ac;
967         struct ldb_request *down_req;
968         struct ldb_message *msg;
969         time_t t = time(NULL);
970         int ret;
971
972         ac = talloc_get_type(req->context, struct replmd_replicated_request);
973         ldb = ldb_module_get_ctx(ac->module);
974
975         if (ares->error != LDB_SUCCESS) {
976                 return ldb_module_done(ac->req, ares->controls,
977                                         ares->response, ares->error);
978         }
979
980         if (ares->type != LDB_REPLY_DONE) {
981                 ldb_set_errstring(ldb,
982                                   "invalid ldb_reply_type in callback");
983                 talloc_free(ares);
984                 return ldb_module_done(ac->req, NULL, NULL,
985                                         LDB_ERR_OPERATIONS_ERROR);
986         }
987
988         /* Get a sequence number from the backend */
989         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ac->seq_num);
990         if (ret != LDB_SUCCESS) {
991                 return ret;
992         }
993
994         /* TODO:
995          * - replace the old object with the newly constructed one
996          */
997
998         msg = ldb_msg_new(ac);
999         if (msg == NULL) {
1000                 ldb_oom(ldb);
1001                 return LDB_ERR_OPERATIONS_ERROR;
1002         }
1003
1004         msg->dn = ac->req->op.rename.newdn;
1005
1006         ret = ldb_build_mod_req(&down_req, ldb, ac,
1007                                 msg,
1008                                 req->controls,
1009                                 ac, replmd_op_callback,
1010                                 req);
1011
1012         if (ret != LDB_SUCCESS) {
1013                 talloc_free(ac);
1014                 return ret;
1015         }
1016         talloc_steal(down_req, msg);
1017
1018         if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
1019                 talloc_free(ac);
1020                 return ret;
1021         }
1022         
1023         if (add_uint64_element(msg, "uSNChanged", ac->seq_num) != LDB_SUCCESS) {
1024                 talloc_free(ac);
1025                 return ret;
1026         }
1027
1028         /* go on with the call chain - do the modify after the rename */
1029         return ldb_next_request(ac->module, down_req);
1030 }
1031
1032
1033 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
1034 {
1035         return ret;
1036 }
1037
1038 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
1039 {
1040         int ret = LDB_ERR_OTHER;
1041         /* TODO: do some error mapping */
1042         return ret;
1043 }
1044
1045 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
1046 {
1047         struct ldb_context *ldb;
1048         struct ldb_request *change_req;
1049         enum ndr_err_code ndr_err;
1050         struct ldb_message *msg;
1051         struct replPropertyMetaDataBlob *md;
1052         struct ldb_val md_value;
1053         uint32_t i;
1054         int ret;
1055
1056         /*
1057          * TODO: check if the parent object exist
1058          */
1059
1060         /*
1061          * TODO: handle the conflict case where an object with the
1062          *       same name exist
1063          */
1064
1065         ldb = ldb_module_get_ctx(ar->module);
1066         msg = ar->objs->objects[ar->index_current].msg;
1067         md = ar->objs->objects[ar->index_current].meta_data;
1068
1069         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ar->seq_num);
1070         if (ret != LDB_SUCCESS) {
1071                 return replmd_replicated_request_error(ar, ret);
1072         }
1073
1074         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
1075         if (ret != LDB_SUCCESS) {
1076                 return replmd_replicated_request_error(ar, ret);
1077         }
1078
1079         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1080         if (ret != LDB_SUCCESS) {
1081                 return replmd_replicated_request_error(ar, ret);
1082         }
1083
1084         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", ar->seq_num);
1085         if (ret != LDB_SUCCESS) {
1086                 return replmd_replicated_request_error(ar, ret);
1087         }
1088
1089         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ar->seq_num);
1090         if (ret != LDB_SUCCESS) {
1091                 return replmd_replicated_request_error(ar, ret);
1092         }
1093
1094         /* remove any message elements that have zero values */
1095         for (i=0; i<msg->num_elements; i++) {
1096                 if (msg->elements[i].num_values == 0) {
1097                         DEBUG(4,(__location__ ": Removing attribute %s with num_values==0\n",
1098                                  msg->elements[i].name));
1099                         memmove(&msg->elements[i], 
1100                                 &msg->elements[i+1], 
1101                                 sizeof(msg->elements[i])*(msg->num_elements - (i+1)));
1102                         msg->num_elements--;
1103                         i--;
1104                 }
1105         }
1106         
1107         /*
1108          * the meta data array is already sorted by the caller
1109          */
1110         for (i=0; i < md->ctr.ctr1.count; i++) {
1111                 md->ctr.ctr1.array[i].local_usn = ar->seq_num;
1112         }
1113         ndr_err = ndr_push_struct_blob(&md_value, msg, 
1114                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1115                                        md,
1116                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1117         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1118                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1119                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1120         }
1121         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
1122         if (ret != LDB_SUCCESS) {
1123                 return replmd_replicated_request_error(ar, ret);
1124         }
1125
1126         replmd_ldb_message_sort(msg, ar->schema);
1127
1128         if (DEBUGLVL(4)) {
1129                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
1130                 DEBUG(4, ("DRS replication add message:\n%s\n", s));
1131                 talloc_free(s);
1132         }
1133
1134         ret = ldb_build_add_req(&change_req,
1135                                 ldb,
1136                                 ar,
1137                                 msg,
1138                                 ar->controls,
1139                                 ar,
1140                                 replmd_op_callback,
1141                                 ar->req);
1142         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1143
1144         return ldb_next_request(ar->module, change_req);
1145 }
1146
1147 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
1148                                                          struct replPropertyMetaData1 *m2)
1149 {
1150         int ret;
1151
1152         if (m1->version != m2->version) {
1153                 return m1->version - m2->version;
1154         }
1155
1156         if (m1->originating_change_time != m2->originating_change_time) {
1157                 return m1->originating_change_time - m2->originating_change_time;
1158         }
1159
1160         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
1161         if (ret != 0) {
1162                 return ret;
1163         }
1164
1165         return m1->originating_usn - m2->originating_usn;
1166 }
1167
1168 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
1169 {
1170         struct ldb_context *ldb;
1171         struct ldb_request *change_req;
1172         enum ndr_err_code ndr_err;
1173         struct ldb_message *msg;
1174         struct replPropertyMetaDataBlob *rmd;
1175         struct replPropertyMetaDataBlob omd;
1176         const struct ldb_val *omd_value;
1177         struct replPropertyMetaDataBlob nmd;
1178         struct ldb_val nmd_value;
1179         uint32_t i,j,ni=0;
1180         uint32_t removed_attrs = 0;
1181         int ret;
1182
1183         ldb = ldb_module_get_ctx(ar->module);
1184         msg = ar->objs->objects[ar->index_current].msg;
1185         rmd = ar->objs->objects[ar->index_current].meta_data;
1186         ZERO_STRUCT(omd);
1187         omd.version = 1;
1188
1189         /*
1190          * TODO: check repl data is correct after a rename
1191          */
1192         if (ldb_dn_compare(msg->dn, ar->search_msg->dn) != 0) {
1193                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_request rename %s => %s\n",
1194                           ldb_dn_get_linearized(ar->search_msg->dn),
1195                           ldb_dn_get_linearized(msg->dn));
1196                 if (ldb_rename(ldb, ar->search_msg->dn, msg->dn) != LDB_SUCCESS) {
1197                         ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_replicated_request rename %s => %s failed - %s\n",
1198                                   ldb_dn_get_linearized(ar->search_msg->dn),
1199                                   ldb_dn_get_linearized(msg->dn),
1200                                   ldb_errstring(ldb));
1201                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
1202                 }
1203         }
1204
1205         /* find existing meta data */
1206         omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
1207         if (omd_value) {
1208                 ndr_err = ndr_pull_struct_blob(omd_value, ar,
1209                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
1210                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
1211                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1212                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1213                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1214                 }
1215
1216                 if (omd.version != 1) {
1217                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1218                 }
1219         }
1220
1221         ZERO_STRUCT(nmd);
1222         nmd.version = 1;
1223         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
1224         nmd.ctr.ctr1.array = talloc_array(ar,
1225                                           struct replPropertyMetaData1,
1226                                           nmd.ctr.ctr1.count);
1227         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1228
1229         /* first copy the old meta data */
1230         for (i=0; i < omd.ctr.ctr1.count; i++) {
1231                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
1232                 ni++;
1233         }
1234
1235         /* now merge in the new meta data */
1236         for (i=0; i < rmd->ctr.ctr1.count; i++) {
1237                 bool found = false;
1238
1239                 for (j=0; j < ni; j++) {
1240                         int cmp;
1241
1242                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
1243                                 continue;
1244                         }
1245
1246                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
1247                                                                             &nmd.ctr.ctr1.array[j]);
1248                         if (cmp > 0) {
1249                                 /* replace the entry */
1250                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
1251                                 found = true;
1252                                 break;
1253                         }
1254
1255                         /* we don't want to apply this change so remove the attribute */
1256                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
1257                         removed_attrs++;
1258
1259                         found = true;
1260                         break;
1261                 }
1262
1263                 if (found) continue;
1264
1265                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
1266                 ni++;
1267         }
1268
1269         /*
1270          * finally correct the size of the meta_data array
1271          */
1272         nmd.ctr.ctr1.count = ni;
1273
1274         /*
1275          * the rdn attribute (the alias for the name attribute),
1276          * 'cn' for most objects is the last entry in the meta data array
1277          * we have stored
1278          *
1279          * sort the new meta data array
1280          */
1281         ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, ar->schema, msg->dn);
1282         if (ret != LDB_SUCCESS) {
1283                 return ret;
1284         }
1285
1286         /*
1287          * check if some replicated attributes left, otherwise skip the ldb_modify() call
1288          */
1289         if (msg->num_elements == 0) {
1290                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
1291                           ar->index_current);
1292
1293                 ar->index_current++;
1294                 return replmd_replicated_apply_next(ar);
1295         }
1296
1297         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
1298                   ar->index_current, msg->num_elements);
1299
1300         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ar->seq_num);
1301         if (ret != LDB_SUCCESS) {
1302                 return replmd_replicated_request_error(ar, ret);
1303         }
1304
1305         for (i=0; i<ni; i++) {
1306                 nmd.ctr.ctr1.array[i].local_usn = ar->seq_num;
1307         }
1308
1309         /* create the meta data value */
1310         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
1311                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1312                                        &nmd,
1313                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1314         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1315                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1316                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1317         }
1318
1319         /*
1320          * when we know that we'll modify the record, add the whenChanged, uSNChanged
1321          * and replPopertyMetaData attributes
1322          */
1323         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1324         if (ret != LDB_SUCCESS) {
1325                 return replmd_replicated_request_error(ar, ret);
1326         }
1327         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ar->seq_num);
1328         if (ret != LDB_SUCCESS) {
1329                 return replmd_replicated_request_error(ar, ret);
1330         }
1331         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
1332         if (ret != LDB_SUCCESS) {
1333                 return replmd_replicated_request_error(ar, ret);
1334         }
1335
1336         replmd_ldb_message_sort(msg, ar->schema);
1337
1338         /* we want to replace the old values */
1339         for (i=0; i < msg->num_elements; i++) {
1340                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
1341         }
1342
1343         if (DEBUGLVL(4)) {
1344                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1345                 DEBUG(4, ("DRS replication modify message:\n%s\n", s));
1346                 talloc_free(s);
1347         }
1348
1349         ret = ldb_build_mod_req(&change_req,
1350                                 ldb,
1351                                 ar,
1352                                 msg,
1353                                 ar->controls,
1354                                 ar,
1355                                 replmd_op_callback,
1356                                 ar->req);
1357         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1358
1359         return ldb_next_request(ar->module, change_req);
1360 }
1361
1362 static int replmd_replicated_apply_search_callback(struct ldb_request *req,
1363                                                    struct ldb_reply *ares)
1364 {
1365         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1366                                                struct replmd_replicated_request);
1367         int ret;
1368
1369         if (!ares) {
1370                 return ldb_module_done(ar->req, NULL, NULL,
1371                                         LDB_ERR_OPERATIONS_ERROR);
1372         }
1373         if (ares->error != LDB_SUCCESS &&
1374             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1375                 return ldb_module_done(ar->req, ares->controls,
1376                                         ares->response, ares->error);
1377         }
1378
1379         switch (ares->type) {
1380         case LDB_REPLY_ENTRY:
1381                 ar->search_msg = talloc_steal(ar, ares->message);
1382                 break;
1383
1384         case LDB_REPLY_REFERRAL:
1385                 /* we ignore referrals */
1386                 break;
1387
1388         case LDB_REPLY_DONE:
1389                 if (ar->search_msg != NULL) {
1390                         ret = replmd_replicated_apply_merge(ar);
1391                 } else {
1392                         ret = replmd_replicated_apply_add(ar);
1393                 }
1394                 if (ret != LDB_SUCCESS) {
1395                         return ldb_module_done(ar->req, NULL, NULL, ret);
1396                 }
1397         }
1398
1399         talloc_free(ares);
1400         return LDB_SUCCESS;
1401 }
1402
1403 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
1404
1405 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
1406 {
1407         struct ldb_context *ldb;
1408         int ret;
1409         char *tmp_str;
1410         char *filter;
1411         struct ldb_request *search_req;
1412         struct ldb_search_options_control *options;
1413
1414         if (ar->index_current >= ar->objs->num_objects) {
1415                 /* done with it, go to next stage */
1416                 return replmd_replicated_uptodate_vector(ar);
1417         }
1418
1419         ldb = ldb_module_get_ctx(ar->module);
1420         ar->search_msg = NULL;
1421
1422         tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].guid_value);
1423         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1424
1425         filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
1426         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1427         talloc_free(tmp_str);
1428
1429         ret = ldb_build_search_req(&search_req,
1430                                    ldb,
1431                                    ar,
1432                                    NULL,
1433                                    LDB_SCOPE_SUBTREE,
1434                                    filter,
1435                                    NULL,
1436                                    NULL,
1437                                    ar,
1438                                    replmd_replicated_apply_search_callback,
1439                                    ar->req);
1440
1441         ret = ldb_request_add_control(search_req, LDB_CONTROL_SHOW_DELETED_OID, true, NULL);
1442         if (ret != LDB_SUCCESS) {
1443                 return ret;
1444         }
1445
1446         /* we need to cope with cross-partition links, so search for
1447            the GUID over all partitions */
1448         options = talloc(search_req, struct ldb_search_options_control);
1449         if (options == NULL) {
1450                 DEBUG(0, (__location__ ": out of memory\n"));
1451                 return LDB_ERR_OPERATIONS_ERROR;
1452         }
1453         options->search_options = LDB_SEARCH_OPTION_PHANTOM_ROOT;
1454
1455         ret = ldb_request_add_control(search_req,
1456                                       LDB_CONTROL_SEARCH_OPTIONS_OID,
1457                                       true, options);
1458         if (ret != LDB_SUCCESS) {
1459                 return ret;
1460         }
1461
1462         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1463
1464         return ldb_next_request(ar->module, search_req);
1465 }
1466
1467 static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
1468                                                       struct ldb_reply *ares)
1469 {
1470         struct ldb_context *ldb;
1471         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1472                                                struct replmd_replicated_request);
1473         ldb = ldb_module_get_ctx(ar->module);
1474
1475         if (!ares) {
1476                 return ldb_module_done(ar->req, NULL, NULL,
1477                                         LDB_ERR_OPERATIONS_ERROR);
1478         }
1479         if (ares->error != LDB_SUCCESS) {
1480                 return ldb_module_done(ar->req, ares->controls,
1481                                         ares->response, ares->error);
1482         }
1483
1484         if (ares->type != LDB_REPLY_DONE) {
1485                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1486                 return ldb_module_done(ar->req, NULL, NULL,
1487                                         LDB_ERR_OPERATIONS_ERROR);
1488         }
1489
1490         talloc_free(ares);
1491
1492         return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
1493 }
1494
1495 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1496 {
1497         struct ldb_context *ldb;
1498         struct ldb_request *change_req;
1499         enum ndr_err_code ndr_err;
1500         struct ldb_message *msg;
1501         struct replUpToDateVectorBlob ouv;
1502         const struct ldb_val *ouv_value;
1503         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1504         struct replUpToDateVectorBlob nuv;
1505         struct ldb_val nuv_value;
1506         struct ldb_message_element *nuv_el = NULL;
1507         const struct GUID *our_invocation_id;
1508         struct ldb_message_element *orf_el = NULL;
1509         struct repsFromToBlob nrf;
1510         struct ldb_val *nrf_value = NULL;
1511         struct ldb_message_element *nrf_el = NULL;
1512         uint32_t i,j,ni=0;
1513         bool found = false;
1514         time_t t = time(NULL);
1515         NTTIME now;
1516         int ret;
1517
1518         ldb = ldb_module_get_ctx(ar->module);
1519         ruv = ar->objs->uptodateness_vector;
1520         ZERO_STRUCT(ouv);
1521         ouv.version = 2;
1522         ZERO_STRUCT(nuv);
1523         nuv.version = 2;
1524
1525         unix_to_nt_time(&now, t);
1526
1527         /*
1528          * first create the new replUpToDateVector
1529          */
1530         ouv_value = ldb_msg_find_ldb_val(ar->search_msg, "replUpToDateVector");
1531         if (ouv_value) {
1532                 ndr_err = ndr_pull_struct_blob(ouv_value, ar,
1533                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &ouv,
1534                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1535                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1536                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1537                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1538                 }
1539
1540                 if (ouv.version != 2) {
1541                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1542                 }
1543         }
1544
1545         /*
1546          * the new uptodateness vector will at least
1547          * contain 1 entry, one for the source_dsa
1548          *
1549          * plus optional values from our old vector and the one from the source_dsa
1550          */
1551         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
1552         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1553         nuv.ctr.ctr2.cursors = talloc_array(ar,
1554                                             struct drsuapi_DsReplicaCursor2,
1555                                             nuv.ctr.ctr2.count);
1556         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1557
1558         /* first copy the old vector */
1559         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1560                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1561                 ni++;
1562         }
1563
1564         /* get our invocation_id if we have one already attached to the ldb */
1565         our_invocation_id = samdb_ntds_invocation_id(ldb);
1566
1567         /* merge in the source_dsa vector is available */
1568         for (i=0; (ruv && i < ruv->count); i++) {
1569                 found = false;
1570
1571                 if (our_invocation_id &&
1572                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1573                                our_invocation_id)) {
1574                         continue;
1575                 }
1576
1577                 for (j=0; j < ni; j++) {
1578                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1579                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1580                                 continue;
1581                         }
1582
1583                         found = true;
1584
1585                         /*
1586                          * we update only the highest_usn and not the latest_sync_success time,
1587                          * because the last success stands for direct replication
1588                          */
1589                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1590                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1591                         }
1592                         break;                  
1593                 }
1594
1595                 if (found) continue;
1596
1597                 /* if it's not there yet, add it */
1598                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1599                 ni++;
1600         }
1601
1602         /*
1603          * merge in the current highwatermark for the source_dsa
1604          */
1605         found = false;
1606         for (j=0; j < ni; j++) {
1607                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1608                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1609                         continue;
1610                 }
1611
1612                 found = true;
1613
1614                 /*
1615                  * here we update the highest_usn and last_sync_success time
1616                  * because we're directly replicating from the source_dsa
1617                  *
1618                  * and use the tmp_highest_usn because this is what we have just applied
1619                  * to our ldb
1620                  */
1621                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1622                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1623                 break;
1624         }
1625         if (!found) {
1626                 /*
1627                  * here we update the highest_usn and last_sync_success time
1628                  * because we're directly replicating from the source_dsa
1629                  *
1630                  * and use the tmp_highest_usn because this is what we have just applied
1631                  * to our ldb
1632                  */
1633                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1634                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1635                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1636                 ni++;
1637         }
1638
1639         /*
1640          * finally correct the size of the cursors array
1641          */
1642         nuv.ctr.ctr2.count = ni;
1643
1644         /*
1645          * sort the cursors
1646          */
1647         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1648               sizeof(struct drsuapi_DsReplicaCursor2),
1649               (comparison_fn_t)drsuapi_DsReplicaCursor2_compare);
1650
1651         /*
1652          * create the change ldb_message
1653          */
1654         msg = ldb_msg_new(ar);
1655         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1656         msg->dn = ar->search_msg->dn;
1657
1658         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
1659                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1660                                        &nuv,
1661                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1662         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1663                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1664                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1665         }
1666         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1667         if (ret != LDB_SUCCESS) {
1668                 return replmd_replicated_request_error(ar, ret);
1669         }
1670         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1671
1672         /*
1673          * now create the new repsFrom value from the given repsFromTo1 structure
1674          */
1675         ZERO_STRUCT(nrf);
1676         nrf.version                                     = 1;
1677         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1678         /* and fix some values... */
1679         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1680         nrf.ctr.ctr1.last_success                       = now;
1681         nrf.ctr.ctr1.last_attempt                       = now;
1682         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1683         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1684
1685         /*
1686          * first see if we already have a repsFrom value for the current source dsa
1687          * if so we'll later replace this value
1688          */
1689         orf_el = ldb_msg_find_element(ar->search_msg, "repsFrom");
1690         if (orf_el) {
1691                 for (i=0; i < orf_el->num_values; i++) {
1692                         struct repsFromToBlob *trf;
1693
1694                         trf = talloc(ar, struct repsFromToBlob);
1695                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1696
1697                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), trf,
1698                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1699                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1700                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1701                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1702                         }
1703
1704                         if (trf->version != 1) {
1705                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1706                         }
1707
1708                         /*
1709                          * we compare the source dsa objectGUID not the invocation_id
1710                          * because we want only one repsFrom value per source dsa
1711                          * and when the invocation_id of the source dsa has changed we don't need 
1712                          * the old repsFrom with the old invocation_id
1713                          */
1714                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1715                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1716                                 talloc_free(trf);
1717                                 continue;
1718                         }
1719
1720                         talloc_free(trf);
1721                         nrf_value = &orf_el->values[i];
1722                         break;
1723                 }
1724
1725                 /*
1726                  * copy over all old values to the new ldb_message
1727                  */
1728                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1729                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1730                 *nrf_el = *orf_el;
1731         }
1732
1733         /*
1734          * if we haven't found an old repsFrom value for the current source dsa
1735          * we'll add a new value
1736          */
1737         if (!nrf_value) {
1738                 struct ldb_val zero_value;
1739                 ZERO_STRUCT(zero_value);
1740                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1741                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1742
1743                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1744         }
1745
1746         /* we now fill the value which is already attached to ldb_message */
1747         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
1748                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1749                                        &nrf,
1750                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1751         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1752                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1753                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1754         }
1755
1756         /* 
1757          * the ldb_message_element for the attribute, has all the old values and the new one
1758          * so we'll replace the whole attribute with all values
1759          */
1760         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1761
1762         if (DEBUGLVL(4)) {
1763                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1764                 DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
1765                 talloc_free(s);
1766         }
1767
1768         /* prepare the ldb_modify() request */
1769         ret = ldb_build_mod_req(&change_req,
1770                                 ldb,
1771                                 ar,
1772                                 msg,
1773                                 ar->controls,
1774                                 ar,
1775                                 replmd_replicated_uptodate_modify_callback,
1776                                 ar->req);
1777         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1778
1779         return ldb_next_request(ar->module, change_req);
1780 }
1781
1782 static int replmd_replicated_uptodate_search_callback(struct ldb_request *req,
1783                                                       struct ldb_reply *ares)
1784 {
1785         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1786                                                struct replmd_replicated_request);
1787         int ret;
1788
1789         if (!ares) {
1790                 return ldb_module_done(ar->req, NULL, NULL,
1791                                         LDB_ERR_OPERATIONS_ERROR);
1792         }
1793         if (ares->error != LDB_SUCCESS &&
1794             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1795                 return ldb_module_done(ar->req, ares->controls,
1796                                         ares->response, ares->error);
1797         }
1798
1799         switch (ares->type) {
1800         case LDB_REPLY_ENTRY:
1801                 ar->search_msg = talloc_steal(ar, ares->message);
1802                 break;
1803
1804         case LDB_REPLY_REFERRAL:
1805                 /* we ignore referrals */
1806                 break;
1807
1808         case LDB_REPLY_DONE:
1809                 if (ar->search_msg == NULL) {
1810                         ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1811                 } else {
1812                         ret = replmd_replicated_uptodate_modify(ar);
1813                 }
1814                 if (ret != LDB_SUCCESS) {
1815                         return ldb_module_done(ar->req, NULL, NULL, ret);
1816                 }
1817         }
1818
1819         talloc_free(ares);
1820         return LDB_SUCCESS;
1821 }
1822
1823
1824 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1825 {
1826         struct ldb_context *ldb;
1827         int ret;
1828         static const char *attrs[] = {
1829                 "replUpToDateVector",
1830                 "repsFrom",
1831                 NULL
1832         };
1833         struct ldb_request *search_req;
1834
1835         ldb = ldb_module_get_ctx(ar->module);
1836         ar->search_msg = NULL;
1837
1838         ret = ldb_build_search_req(&search_req,
1839                                    ldb,
1840                                    ar,
1841                                    ar->objs->partition_dn,
1842                                    LDB_SCOPE_BASE,
1843                                    "(objectClass=*)",
1844                                    attrs,
1845                                    NULL,
1846                                    ar,
1847                                    replmd_replicated_uptodate_search_callback,
1848                                    ar->req);
1849         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1850
1851         return ldb_next_request(ar->module, search_req);
1852 }
1853
1854
1855
1856 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1857 {
1858         struct ldb_context *ldb;
1859         struct dsdb_extended_replicated_objects *objs;
1860         struct replmd_replicated_request *ar;
1861         struct ldb_control **ctrls;
1862         int ret, i;
1863         struct replmd_private *replmd_private = 
1864                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1865
1866         ldb = ldb_module_get_ctx(module);
1867
1868         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1869
1870         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1871         if (!objs) {
1872                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1873                 return LDB_ERR_PROTOCOL_ERROR;
1874         }
1875
1876         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1877                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1878                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1879                 return LDB_ERR_PROTOCOL_ERROR;
1880         }
1881
1882         ar = replmd_ctx_init(module, req);
1883         if (!ar)
1884                 return LDB_ERR_OPERATIONS_ERROR;
1885
1886         /* Set the flags to have the replmd_op_callback run over the full set of objects */
1887         ar->apply_mode = true;
1888         ar->objs = objs;
1889         ar->schema = dsdb_get_schema(ldb);
1890         if (!ar->schema) {
1891                 ldb_debug_set(ldb, LDB_DEBUG_FATAL, "replmd_ctx_init: no loaded schema found\n");
1892                 talloc_free(ar);
1893                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
1894                 return LDB_ERR_CONSTRAINT_VIOLATION;
1895         }
1896
1897         ctrls = req->controls;
1898
1899         if (req->controls) {
1900                 req->controls = talloc_memdup(ar, req->controls,
1901                                               talloc_get_size(req->controls));
1902                 if (!req->controls) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1903         }
1904
1905         ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, NULL);
1906         if (ret != LDB_SUCCESS) {
1907                 return ret;
1908         }
1909
1910         ar->controls = req->controls;
1911         req->controls = ctrls;
1912
1913         DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
1914
1915         /* save away the linked attributes for the end of the
1916            transaction */
1917         for (i=0; i<ar->objs->linked_attributes_count; i++) {
1918                 struct la_entry *la_entry;
1919
1920                 if (replmd_private->la_ctx == NULL) {
1921                         replmd_private->la_ctx = talloc_new(replmd_private);
1922                 }
1923                 la_entry = talloc(replmd_private->la_ctx, struct la_entry);
1924                 if (la_entry == NULL) {
1925                         ldb_oom(ldb);
1926                         return LDB_ERR_OPERATIONS_ERROR;
1927                 }
1928                 la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
1929                 if (la_entry->la == NULL) {
1930                         talloc_free(la_entry);
1931                         ldb_oom(ldb);
1932                         return LDB_ERR_OPERATIONS_ERROR;
1933                 }
1934                 *la_entry->la = ar->objs->linked_attributes[i];
1935
1936                 /* we need to steal the non-scalars so they stay
1937                    around until the end of the transaction */
1938                 talloc_steal(la_entry->la, la_entry->la->identifier);
1939                 talloc_steal(la_entry->la, la_entry->la->value.blob);
1940
1941                 DLIST_ADD(replmd_private->la_list, la_entry);
1942         }
1943
1944         return replmd_replicated_apply_next(ar);
1945 }
1946
1947 /*
1948   process one linked attribute structure
1949  */
1950 static int replmd_process_linked_attribute(struct ldb_module *module,
1951                                            struct la_entry *la_entry)
1952 {                                          
1953         struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
1954         struct ldb_context *ldb = ldb_module_get_ctx(module);
1955         struct dsdb_schema *schema = dsdb_get_schema(ldb);
1956         struct drsuapi_DsReplicaObjectIdentifier3 target;
1957         struct ldb_message *msg;
1958         TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
1959         struct ldb_request *mod_req;
1960         int ret;
1961         const struct dsdb_attribute *attr;
1962         struct ldb_dn *target_dn;
1963         struct dsdb_dn *dsdb_dn;
1964         uint64_t seq_num = 0;
1965         struct drsuapi_DsReplicaAttribute drs;
1966         struct drsuapi_DsAttributeValue val;
1967         struct ldb_message_element el;
1968         const struct ldb_val *guid;
1969         WERROR status;
1970
1971         drs.value_ctr.num_values = 1;
1972         drs.value_ctr.values = &val;
1973         val.blob = la->value.blob;
1974
1975 /*
1976 linked_attributes[0]:                                                     
1977      &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute 
1978         identifier               : *                                      
1979             identifier: struct drsuapi_DsReplicaObjectIdentifier          
1980                 __ndr_size               : 0x0000003a (58)                
1981                 __ndr_size_sid           : 0x00000000 (0)                 
1982                 guid                     : 8e95b6a9-13dd-4158-89db-3220a5be5cc7
1983                 sid                      : S-0-0                               
1984                 __ndr_size_dn            : 0x00000000 (0)                      
1985                 dn                       : ''                                  
1986         attid                    : DRSUAPI_ATTRIBUTE_member (0x1F)             
1987         value: struct drsuapi_DsAttributeValue                                 
1988             __ndr_size               : 0x0000007e (126)                        
1989             blob                     : *                                       
1990                 blob                     : DATA_BLOB length=126                
1991         flags                    : 0x00000001 (1)                              
1992                1: DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE                      
1993         originating_add_time     : Wed Sep  2 22:20:01 2009 EST                
1994         meta_data: struct drsuapi_DsReplicaMetaData                            
1995             version                  : 0x00000015 (21)                         
1996             originating_change_time  : Wed Sep  2 23:39:07 2009 EST            
1997             originating_invocation_id: 794640f3-18cf-40ee-a211-a93992b67a64    
1998             originating_usn          : 0x000000000001e19c (123292)             
1999
2000 (for cases where the link is to a normal DN)
2001      &target: struct drsuapi_DsReplicaObjectIdentifier3                        
2002         __ndr_size               : 0x0000007e (126)                            
2003         __ndr_size_sid           : 0x0000001c (28)                             
2004         guid                     : 7639e594-db75-4086-b0d4-67890ae46031        
2005         sid                      : S-1-5-21-2848215498-2472035911-1947525656-19924
2006         __ndr_size_dn            : 0x00000022 (34)                                
2007         dn                       : 'CN=UOne,OU=TestOU,DC=vsofs8,DC=com'           
2008  */
2009         
2010         /* find the attribute being modified */
2011         attr = dsdb_attribute_by_attributeID_id(schema, la->attid);
2012         if (attr == NULL) {
2013                 DEBUG(0, (__location__ ": Unable to find attributeID 0x%x\n", la->attid));
2014                 talloc_free(tmp_ctx);
2015                 return LDB_ERR_OPERATIONS_ERROR;
2016         }
2017
2018         status = attr->syntax->drsuapi_to_ldb(ldb, schema, attr, &drs, tmp_ctx, &el);
2019
2020         /* construct a modify request for this attribute change */
2021         msg = ldb_msg_new(tmp_ctx);
2022         if (!msg) {
2023                 ldb_oom(ldb);
2024                 talloc_free(tmp_ctx);
2025                 return LDB_ERR_OPERATIONS_ERROR;
2026         }
2027
2028         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, 
2029                                    GUID_string(tmp_ctx, &la->identifier->guid), &msg->dn);
2030         if (ret != LDB_SUCCESS) {
2031                 talloc_free(tmp_ctx);
2032                 return ret;
2033         }
2034
2035         el.name = attr->lDAPDisplayName;
2036         if (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) {
2037                 ret = ldb_msg_add(msg, &el, LDB_FLAG_MOD_ADD);
2038         } else {
2039                 ret = ldb_msg_add(msg, &el, LDB_FLAG_MOD_DELETE);
2040         }
2041         if (ret != LDB_SUCCESS) {
2042                 talloc_free(tmp_ctx);
2043                 return ret;
2044         }
2045
2046         dsdb_dn = dsdb_dn_parse(tmp_ctx, ldb, &el.values[0], attr->syntax->ldap_oid);
2047         if (!dsdb_dn) {
2048                 DEBUG(0,(__location__ ": Failed to parse just-generated DN\n"));
2049                 talloc_free(tmp_ctx);
2050                 return LDB_ERR_INVALID_DN_SYNTAX;
2051         }
2052
2053         guid = ldb_dn_get_extended_component(dsdb_dn->dn, "GUID");
2054         if (!guid) {
2055                 DEBUG(0,(__location__ ": Failed to parse GUID from just-generated DN\n"));
2056                 talloc_free(tmp_ctx);
2057                 return LDB_ERR_INVALID_DN_SYNTAX;
2058         }
2059
2060         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, ldb_binary_encode(tmp_ctx, *guid), &target_dn);
2061         if (ret != LDB_SUCCESS) {
2062                 /* If this proves to be a problem in the future, then
2063                  * just remove the return - perhaps we can just use
2064                  * the details the replication peer supplied */
2065
2066                 DEBUG(0,(__location__ ": Failed to map GUID %s to DN\n", GUID_string(tmp_ctx, &target.guid)));
2067                 talloc_free(tmp_ctx);
2068                 return LDB_ERR_OPERATIONS_ERROR;
2069         } else {
2070
2071                 /* Now update with full DN we just found in the DB (including extended components) */
2072                 dsdb_dn->dn = target_dn;
2073                 /* Now make a linearized version, using the original binary components (if any) */
2074                 el.values[0] = data_blob_string_const(dsdb_dn_get_extended_linearized(tmp_ctx, dsdb_dn, 1));
2075         }
2076
2077         ret = replmd_update_rpmd(module, schema, msg, &seq_num);
2078         if (ret != LDB_SUCCESS) {
2079                 talloc_free(tmp_ctx);
2080                 return ret;
2081         }
2082
2083         /* we only change whenChanged and uSNChanged if the seq_num
2084            has changed */
2085         if (seq_num != 0) {
2086                 time_t t = time(NULL);
2087
2088                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
2089                         talloc_free(tmp_ctx);
2090                         return LDB_ERR_OPERATIONS_ERROR;
2091                 }
2092
2093                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
2094                         talloc_free(tmp_ctx);
2095                         return LDB_ERR_OPERATIONS_ERROR;
2096                 }
2097         }
2098
2099         ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2100                                 msg,
2101                                 NULL,
2102                                 NULL, 
2103                                 ldb_op_default_callback,
2104                                 NULL);
2105         if (ret != LDB_SUCCESS) {
2106                 talloc_free(tmp_ctx);
2107                 return ret;
2108         }
2109         talloc_steal(mod_req, msg);
2110
2111         if (DEBUGLVL(4)) {
2112                 DEBUG(4,("Applying DRS linked attribute change:\n%s\n",
2113                          ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg)));
2114         }
2115
2116         /* Run the new request */
2117         ret = ldb_next_request(module, mod_req);
2118
2119         /* we need to wait for this to finish, as we are being called
2120            from the synchronous end_transaction hook of this module */
2121         if (ret == LDB_SUCCESS) {
2122                 ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
2123         }
2124
2125         if (ret == LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
2126                 /* the link destination exists, we need to update it
2127                  * by deleting the old one for the same DN then adding
2128                  * the new one */
2129                 msg->elements = talloc_realloc(msg, msg->elements,
2130                                                struct ldb_message_element,
2131                                                msg->num_elements+1);
2132                 if (msg->elements == NULL) {
2133                         ldb_oom(ldb);
2134                         talloc_free(tmp_ctx);
2135                         return LDB_ERR_OPERATIONS_ERROR;
2136                 }
2137                 /* this relies on the backend matching the old entry
2138                    only by the DN portion of the extended DN */
2139                 msg->elements[1] = msg->elements[0];
2140                 msg->elements[0].flags = LDB_FLAG_MOD_DELETE;
2141                 msg->num_elements++;
2142
2143                 ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2144                                         msg,
2145                                         NULL,
2146                                         NULL, 
2147                                         ldb_op_default_callback,
2148                                         NULL);
2149                 if (ret != LDB_SUCCESS) {
2150                         talloc_free(tmp_ctx);
2151                         return ret;
2152                 }
2153
2154                 /* Run the new request */
2155                 ret = ldb_next_request(module, mod_req);
2156                 
2157                 if (ret == LDB_SUCCESS) {
2158                         ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
2159                 }
2160         }
2161
2162         if (ret != LDB_SUCCESS) {
2163                 ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s' %s\n",
2164                           ldb_errstring(ldb),
2165                           ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
2166                 ret = LDB_SUCCESS;
2167         }
2168         
2169         talloc_free(tmp_ctx);
2170
2171         return ret;     
2172 }
2173
2174 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
2175 {
2176         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
2177                 return replmd_extended_replicated_objects(module, req);
2178         }
2179
2180         return ldb_next_request(module, req);
2181 }
2182
2183
2184 /*
2185   we hook into the transaction operations to allow us to 
2186   perform the linked attribute updates at the end of the whole
2187   transaction. This allows a forward linked attribute to be created
2188   before the object is created. During a vampire, w2k8 sends us linked
2189   attributes before the objects they are part of.
2190  */
2191 static int replmd_start_transaction(struct ldb_module *module)
2192 {
2193         /* create our private structure for this transaction */
2194         struct replmd_private *replmd_private = talloc_get_type(ldb_module_get_private(module),
2195                                                                 struct replmd_private);
2196         talloc_free(replmd_private->la_ctx);
2197         replmd_private->la_list = NULL;
2198         replmd_private->la_ctx = NULL;
2199
2200         /* free any leftover mod_usn records from cancelled
2201            transactions */
2202         while (replmd_private->ncs) {
2203                 struct nc_entry *e = replmd_private->ncs;
2204                 DLIST_REMOVE(replmd_private->ncs, e);
2205                 talloc_free(e);
2206         }
2207
2208         return ldb_next_start_trans(module);
2209 }
2210
2211 /*
2212   on prepare commit we loop over our queued la_context structures and
2213   apply each of them  
2214  */
2215 static int replmd_prepare_commit(struct ldb_module *module)
2216 {
2217         struct replmd_private *replmd_private = 
2218                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2219         struct la_entry *la, *prev;
2220         int ret;
2221
2222         /* walk the list backwards, to do the first entry first, as we
2223          * added the entries with DLIST_ADD() which puts them at the
2224          * start of the list */
2225         for (la = replmd_private->la_list; la && la->next; la=la->next) ;
2226
2227         for (; la; la=prev) {
2228                 prev = la->prev;
2229                 DLIST_REMOVE(replmd_private->la_list, la);
2230                 ret = replmd_process_linked_attribute(module, la);
2231                 if (ret != LDB_SUCCESS) {
2232                         talloc_free(replmd_private->la_ctx);
2233                         replmd_private->la_list = NULL;
2234                         replmd_private->la_ctx = NULL;
2235                         return ret;
2236                 }
2237         }
2238
2239         talloc_free(replmd_private->la_ctx);
2240         replmd_private->la_list = NULL;
2241         replmd_private->la_ctx = NULL;
2242
2243         /* possibly change @REPLCHANGED */
2244         ret = replmd_notify_store(module);
2245         if (ret != LDB_SUCCESS) {
2246                 return ret;
2247         }
2248         
2249         return ldb_next_prepare_commit(module);
2250 }
2251
2252 static int replmd_del_transaction(struct ldb_module *module)
2253 {
2254         struct replmd_private *replmd_private = 
2255                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2256         talloc_free(replmd_private->la_ctx);
2257         replmd_private->la_list = NULL;
2258         replmd_private->la_ctx = NULL;
2259         return ldb_next_del_trans(module);
2260 }
2261
2262
2263 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
2264         .name          = "repl_meta_data",
2265         .init_context      = replmd_init,
2266         .add               = replmd_add,
2267         .modify            = replmd_modify,
2268         .rename            = replmd_rename,
2269         .extended          = replmd_extended,
2270         .start_transaction = replmd_start_transaction,
2271         .prepare_commit    = replmd_prepare_commit,
2272         .del_transaction   = replmd_del_transaction,
2273 };