s4-dsdb: make sure mod_usn list is zeroed on each transaction
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /*
24  *  Name: ldb
25  *
26  *  Component: ldb repl_meta_data module
27  *
28  *  Description: - add a unique objectGUID onto every new record,
29  *               - handle whenCreated, whenChanged timestamps
30  *               - handle uSNCreated, uSNChanged numbers
31  *               - handle replPropertyMetaData attribute
32  *
33  *  Author: Simo Sorce
34  *  Author: Stefan Metzmacher
35  */
36
37 #include "includes.h"
38 #include "ldb_module.h"
39 #include "dsdb/samdb/samdb.h"
40 #include "dsdb/common/proto.h"
41 #include "../libds/common/flags.h"
42 #include "librpc/gen_ndr/ndr_misc.h"
43 #include "librpc/gen_ndr/ndr_drsuapi.h"
44 #include "librpc/gen_ndr/ndr_drsblobs.h"
45 #include "param/param.h"
46 #include "libcli/security/dom_sid.h"
47 #include "lib/util/dlinklist.h"
48
49 struct replmd_private {
50         TALLOC_CTX *la_ctx;
51         struct la_entry *la_list;
52         struct nc_entry {
53                 struct nc_entry *prev, *next;
54                 struct ldb_dn *dn;
55                 uint64_t mod_usn;
56         } *ncs;
57 };
58
59 struct la_entry {
60         struct la_entry *next, *prev;
61         struct drsuapi_DsReplicaLinkedAttribute *la;
62 };
63
64 struct replmd_replicated_request {
65         struct ldb_module *module;
66         struct ldb_request *req;
67
68         const struct dsdb_schema *schema;
69
70         /* the controls we pass down */
71         struct ldb_control **controls;
72
73         /* details for the mode where we apply a bunch of inbound replication meessages */
74         bool apply_mode;
75         uint32_t index_current;
76         struct dsdb_extended_replicated_objects *objs;
77
78         struct ldb_message *search_msg;
79
80         uint64_t seq_num;
81
82 };
83
84 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
85
86 /*
87   initialise the module
88   allocate the private structure and build the list
89   of partition DNs for use by replmd_notify()
90  */
91 static int replmd_init(struct ldb_module *module)
92 {
93         struct replmd_private *replmd_private;
94         struct ldb_context *ldb = ldb_module_get_ctx(module);
95
96         replmd_private = talloc_zero(module, struct replmd_private);
97         if (replmd_private == NULL) {
98                 ldb_oom(ldb);
99                 return LDB_ERR_OPERATIONS_ERROR;
100         }
101         ldb_module_set_private(module, replmd_private);
102
103         return ldb_next_init(module);
104 }
105
106
107 /*
108  * Callback for most write operations in this module:
109  * 
110  * notify the repl task that a object has changed. The notifies are
111  * gathered up in the replmd_private structure then written to the
112  * @REPLCHANGED object in each partition during the prepare_commit
113  */
114 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
115 {
116         int ret;
117         struct replmd_replicated_request *ac = 
118                 talloc_get_type_abort(req->context, struct replmd_replicated_request);
119         struct replmd_private *replmd_private = 
120                 talloc_get_type_abort(ldb_module_get_private(ac->module), struct replmd_private);
121         struct nc_entry *modified_partition;
122         struct ldb_control *partition_ctrl;
123         const struct dsdb_control_current_partition *partition;
124
125         struct ldb_control **controls;
126
127         partition_ctrl = ldb_reply_get_control(ares, DSDB_CONTROL_CURRENT_PARTITION_OID);
128
129         /* Remove the 'partition' control from what we pass up the chain */
130         controls = controls_except_specified(ares->controls, ares, partition_ctrl);
131
132         if (ares->error != LDB_SUCCESS) {
133                 return ldb_module_done(ac->req, controls,
134                                         ares->response, ares->error);
135         }
136
137         if (ares->type != LDB_REPLY_DONE) {
138                 ldb_set_errstring(ldb_module_get_ctx(ac->module), "Invalid reply type for notify\n!");
139                 return ldb_module_done(ac->req, NULL,
140                                        NULL, LDB_ERR_OPERATIONS_ERROR);
141         }
142
143         if (!partition_ctrl) {
144                 return ldb_module_done(ac->req, NULL,
145                                        NULL, LDB_ERR_OPERATIONS_ERROR);
146         }
147
148         partition = talloc_get_type_abort(partition_ctrl->data,
149                                     struct dsdb_control_current_partition);
150         
151         if (ac->seq_num > 0) {
152                 for (modified_partition = replmd_private->ncs; modified_partition; 
153                      modified_partition = modified_partition->next) {
154                         if (ldb_dn_compare(modified_partition->dn, partition->dn) == 0) {
155                                 break;
156                         }
157                 }
158                 
159                 if (modified_partition == NULL) {
160                         modified_partition = talloc_zero(replmd_private, struct nc_entry);
161                         if (!modified_partition) {
162                                 ldb_oom(ldb_module_get_ctx(ac->module));
163                                 return ldb_module_done(ac->req, NULL,
164                                                        NULL, LDB_ERR_OPERATIONS_ERROR);
165                         }
166                         modified_partition->dn = ldb_dn_copy(modified_partition, partition->dn);
167                         if (!modified_partition->dn) {
168                                 ldb_oom(ldb_module_get_ctx(ac->module));
169                                 return ldb_module_done(ac->req, NULL,
170                                                        NULL, LDB_ERR_OPERATIONS_ERROR);
171                         }
172                         DLIST_ADD(replmd_private->ncs, modified_partition);
173                 }
174
175                 if (ac->seq_num > modified_partition->mod_usn) {
176                         modified_partition->mod_usn = ac->seq_num;
177                 }
178         }
179
180         if (ac->apply_mode) {
181                 talloc_free(ares);
182                 ac->index_current++;
183                 
184                 ret = replmd_replicated_apply_next(ac);
185                 if (ret != LDB_SUCCESS) {
186                         return ldb_module_done(ac->req, NULL, NULL, ret);
187                 }
188                 return ret;
189         } else {
190                 /* free the partition control container here, for the
191                  * common path.  Other cases will have it cleaned up
192                  * eventually with the ares */
193                 talloc_free(partition_ctrl);
194                 return ldb_module_done(ac->req, 
195                                        controls_except_specified(controls, ares, partition_ctrl),
196                                        ares->response, LDB_SUCCESS);
197         }
198 }
199
200
201 /*
202  * update a @REPLCHANGED record in each partition if there have been
203  * any writes of replicated data in the partition
204  */
205 static int replmd_notify_store(struct ldb_module *module)
206 {
207         struct replmd_private *replmd_private = 
208                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
209         struct ldb_context *ldb = ldb_module_get_ctx(module);
210
211         while (replmd_private->ncs) {
212                 int ret;
213                 struct nc_entry *modified_partition = replmd_private->ncs;
214
215                 ret = dsdb_save_partition_usn(ldb, modified_partition->dn, modified_partition->mod_usn);
216                 if (ret != LDB_SUCCESS) {
217                         DEBUG(0,(__location__ ": Failed to save partition uSN for %s\n",
218                                  ldb_dn_get_linearized(modified_partition->dn)));
219                         return ret;
220                 }
221                 DLIST_REMOVE(replmd_private->ncs, modified_partition);
222                 talloc_free(modified_partition);
223         }
224
225         return LDB_SUCCESS;
226 }
227
228
229 /*
230   created a replmd_replicated_request context
231  */
232 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
233                                                          struct ldb_request *req)
234 {
235         struct ldb_context *ldb;
236         struct replmd_replicated_request *ac;
237
238         ldb = ldb_module_get_ctx(module);
239
240         ac = talloc_zero(req, struct replmd_replicated_request);
241         if (ac == NULL) {
242                 ldb_oom(ldb);
243                 return NULL;
244         }
245
246         ac->module = module;
247         ac->req = req;
248
249         ac->schema = dsdb_get_schema(ldb);
250         if (!ac->schema) {
251                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
252                               "replmd_modify: no dsdb_schema loaded");
253                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
254                 return NULL;
255         }
256
257         return ac;
258 }
259
260 /*
261   add a time element to a record
262 */
263 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
264 {
265         struct ldb_message_element *el;
266         char *s;
267
268         if (ldb_msg_find_element(msg, attr) != NULL) {
269                 return LDB_SUCCESS;
270         }
271
272         s = ldb_timestring(msg, t);
273         if (s == NULL) {
274                 return LDB_ERR_OPERATIONS_ERROR;
275         }
276
277         if (ldb_msg_add_string(msg, attr, s) != LDB_SUCCESS) {
278                 return LDB_ERR_OPERATIONS_ERROR;
279         }
280
281         el = ldb_msg_find_element(msg, attr);
282         /* always set as replace. This works because on add ops, the flag
283            is ignored */
284         el->flags = LDB_FLAG_MOD_REPLACE;
285
286         return LDB_SUCCESS;
287 }
288
289 /*
290   add a uint64_t element to a record
291 */
292 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
293 {
294         struct ldb_message_element *el;
295
296         if (ldb_msg_find_element(msg, attr) != NULL) {
297                 return LDB_SUCCESS;
298         }
299
300         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != LDB_SUCCESS) {
301                 return LDB_ERR_OPERATIONS_ERROR;
302         }
303
304         el = ldb_msg_find_element(msg, attr);
305         /* always set as replace. This works because on add ops, the flag
306            is ignored */
307         el->flags = LDB_FLAG_MOD_REPLACE;
308
309         return LDB_SUCCESS;
310 }
311
312 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
313                                                    const struct replPropertyMetaData1 *m2,
314                                                    const uint32_t *rdn_attid)
315 {
316         if (m1->attid == m2->attid) {
317                 return 0;
318         }
319
320         /*
321          * the rdn attribute should be at the end!
322          * so we need to return a value greater than zero
323          * which means m1 is greater than m2
324          */
325         if (m1->attid == *rdn_attid) {
326                 return 1;
327         }
328
329         /*
330          * the rdn attribute should be at the end!
331          * so we need to return a value less than zero
332          * which means m2 is greater than m1
333          */
334         if (m2->attid == *rdn_attid) {
335                 return -1;
336         }
337
338         return m1->attid - m2->attid;
339 }
340
341 static int replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
342                                                 const struct dsdb_schema *schema,
343                                                 struct ldb_dn *dn)
344 {
345         const char *rdn_name;
346         const struct dsdb_attribute *rdn_sa;
347
348         rdn_name = ldb_dn_get_rdn_name(dn);
349         if (!rdn_name) {
350                 DEBUG(0,(__location__ ": No rDN for %s?\n", ldb_dn_get_linearized(dn)));
351                 return LDB_ERR_OPERATIONS_ERROR;
352         }
353
354         rdn_sa = dsdb_attribute_by_lDAPDisplayName(schema, rdn_name);
355         if (rdn_sa == NULL) {
356                 DEBUG(0,(__location__ ": No sa found for rDN %s for %s\n", rdn_name, ldb_dn_get_linearized(dn)));
357                 return LDB_ERR_OPERATIONS_ERROR;                
358         }
359
360         DEBUG(6,("Sorting rpmd with attid exception %u rDN=%s DN=%s\n", 
361                  rdn_sa->attributeID_id, rdn_name, ldb_dn_get_linearized(dn)));
362
363         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
364                   discard_const_p(void, &rdn_sa->attributeID_id), 
365                   (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
366
367         return LDB_SUCCESS;
368 }
369
370 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
371                                                  const struct ldb_message_element *e2,
372                                                  const struct dsdb_schema *schema)
373 {
374         const struct dsdb_attribute *a1;
375         const struct dsdb_attribute *a2;
376
377         /* 
378          * TODO: make this faster by caching the dsdb_attribute pointer
379          *       on the ldb_messag_element
380          */
381
382         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
383         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
384
385         /*
386          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
387          *       in the schema
388          */
389         if (!a1 || !a2) {
390                 return strcasecmp(e1->name, e2->name);
391         }
392
393         return a1->attributeID_id - a2->attributeID_id;
394 }
395
396 static void replmd_ldb_message_sort(struct ldb_message *msg,
397                                     const struct dsdb_schema *schema)
398 {
399         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
400                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
401 }
402
403 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
404 {
405         struct ldb_context *ldb;
406         struct ldb_control *control;
407         struct ldb_control **saved_controls;
408         struct replmd_replicated_request *ac;
409         enum ndr_err_code ndr_err;
410         struct ldb_request *down_req;
411         struct ldb_message *msg;
412         const DATA_BLOB *guid_blob;
413         struct GUID guid;
414         struct ldb_val guid_value;
415         struct replPropertyMetaDataBlob nmd;
416         struct ldb_val nmd_value;
417         const struct GUID *our_invocation_id;
418         time_t t = time(NULL);
419         NTTIME now;
420         char *time_str;
421         int ret;
422         uint32_t i, ni=0;
423         bool allow_add_guid = false;
424         bool remove_current_guid = false;
425
426         /* check if there's a show relax control (used by provision to say 'I know what I'm doing') */
427         control = ldb_request_get_control(req, LDB_CONTROL_RELAX_OID);
428         if (control) {
429                 allow_add_guid = 1;
430         }
431
432         /* do not manipulate our control entries */
433         if (ldb_dn_is_special(req->op.add.message->dn)) {
434                 return ldb_next_request(module, req);
435         }
436
437         ldb = ldb_module_get_ctx(module);
438
439         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
440
441         ac = replmd_ctx_init(module, req);
442         if (!ac) {
443                 return LDB_ERR_OPERATIONS_ERROR;
444         }
445
446         guid_blob = ldb_msg_find_ldb_val(req->op.add.message, "objectGUID");
447         if ( guid_blob != NULL ) {
448                 if( !allow_add_guid ) {
449                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
450                               "replmd_add: it's not allowed to add an object with objectGUID\n");
451                         talloc_free(ac);
452                         return LDB_ERR_UNWILLING_TO_PERFORM;
453                 } else {
454                         NTSTATUS status = GUID_from_data_blob(guid_blob,&guid);
455                         if ( !NT_STATUS_IS_OK(status)) {
456                                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
457                                       "replmd_add: Unable to parse as a GUID the attribute objectGUID\n");
458                                 talloc_free(ac);
459                                 return LDB_ERR_UNWILLING_TO_PERFORM;
460                         }
461                         /* we remove this attribute as it can be a string and will not be treated 
462                         correctly and then we will readd it latter on in the good format*/
463                         remove_current_guid = true;
464                 }
465         } else {
466                 /* a new GUID */
467                 guid = GUID_random();
468         }
469
470         /* Get a sequence number from the backend */
471         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ac->seq_num);
472         if (ret != LDB_SUCCESS) {
473                 talloc_free(ac);
474                 return ret;
475         }
476
477         /* get our invocationId */
478         our_invocation_id = samdb_ntds_invocation_id(ldb);
479         if (!our_invocation_id) {
480                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
481                               "replmd_add: unable to find invocationId\n");
482                 talloc_free(ac);
483                 return LDB_ERR_OPERATIONS_ERROR;
484         }
485
486         /* we have to copy the message as the caller might have it as a const */
487         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
488         if (msg == NULL) {
489                 ldb_oom(ldb);
490                 talloc_free(ac);
491                 return LDB_ERR_OPERATIONS_ERROR;
492         }
493
494         /* generated times */
495         unix_to_nt_time(&now, t);
496         time_str = ldb_timestring(msg, t);
497         if (!time_str) {
498                 ldb_oom(ldb);
499                 talloc_free(ac);
500                 return LDB_ERR_OPERATIONS_ERROR;
501         }
502         if (remove_current_guid) {
503                 ldb_msg_remove_attr(msg,"objectGUID");
504         }
505
506         /* 
507          * remove autogenerated attributes
508          */
509         ldb_msg_remove_attr(msg, "whenCreated");
510         ldb_msg_remove_attr(msg, "whenChanged");
511         ldb_msg_remove_attr(msg, "uSNCreated");
512         ldb_msg_remove_attr(msg, "uSNChanged");
513         ldb_msg_remove_attr(msg, "replPropertyMetaData");
514
515         /*
516          * readd replicated attributes
517          */
518         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
519         if (ret != LDB_SUCCESS) {
520                 ldb_oom(ldb);
521                 talloc_free(ac);
522                 return ret;
523         }
524
525         /* build the replication meta_data */
526         ZERO_STRUCT(nmd);
527         nmd.version             = 1;
528         nmd.ctr.ctr1.count      = msg->num_elements;
529         nmd.ctr.ctr1.array      = talloc_array(msg,
530                                                struct replPropertyMetaData1,
531                                                nmd.ctr.ctr1.count);
532         if (!nmd.ctr.ctr1.array) {
533                 ldb_oom(ldb);
534                 talloc_free(ac);
535                 return LDB_ERR_OPERATIONS_ERROR;
536         }
537
538         for (i=0; i < msg->num_elements; i++) {
539                 struct ldb_message_element *e = &msg->elements[i];
540                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
541                 const struct dsdb_attribute *sa;
542
543                 if (e->name[0] == '@') continue;
544
545                 sa = dsdb_attribute_by_lDAPDisplayName(ac->schema, e->name);
546                 if (!sa) {
547                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
548                                       "replmd_add: attribute '%s' not defined in schema\n",
549                                       e->name);
550                         talloc_free(ac);
551                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
552                 }
553
554                 if ((sa->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (sa->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
555                         /* if the attribute is not replicated (0x00000001)
556                          * or constructed (0x00000004) it has no metadata
557                          */
558                         continue;
559                 }
560
561                 m->attid                        = sa->attributeID_id;
562                 m->version                      = 1;
563                 m->originating_change_time      = now;
564                 m->originating_invocation_id    = *our_invocation_id;
565                 m->originating_usn              = ac->seq_num;
566                 m->local_usn                    = ac->seq_num;
567                 ni++;
568         }
569
570         /* fix meta data count */
571         nmd.ctr.ctr1.count = ni;
572
573         /*
574          * sort meta data array, and move the rdn attribute entry to the end
575          */
576         ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, ac->schema, msg->dn);
577         if (ret != LDB_SUCCESS) {
578                 talloc_free(ac);
579                 return ret;
580         }
581
582         /* generated NDR encoded values */
583         ndr_err = ndr_push_struct_blob(&guid_value, msg, 
584                                        NULL,
585                                        &guid,
586                                        (ndr_push_flags_fn_t)ndr_push_GUID);
587         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
588                 ldb_oom(ldb);
589                 talloc_free(ac);
590                 return LDB_ERR_OPERATIONS_ERROR;
591         }
592         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
593                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
594                                        &nmd,
595                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
596         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
597                 ldb_oom(ldb);
598                 talloc_free(ac);
599                 return LDB_ERR_OPERATIONS_ERROR;
600         }
601
602         /*
603          * add the autogenerated values
604          */
605         ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
606         if (ret != LDB_SUCCESS) {
607                 ldb_oom(ldb);
608                 talloc_free(ac);
609                 return ret;
610         }
611         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
612         if (ret != LDB_SUCCESS) {
613                 ldb_oom(ldb);
614                 talloc_free(ac);
615                 return ret;
616         }
617         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", ac->seq_num);
618         if (ret != LDB_SUCCESS) {
619                 ldb_oom(ldb);
620                 talloc_free(ac);
621                 return ret;
622         }
623         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ac->seq_num);
624         if (ret != LDB_SUCCESS) {
625                 ldb_oom(ldb);
626                 talloc_free(ac);
627                 return ret;
628         }
629         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
630         if (ret != LDB_SUCCESS) {
631                 ldb_oom(ldb);
632                 talloc_free(ac);
633                 return ret;
634         }
635
636         /*
637          * sort the attributes by attid before storing the object
638          */
639         replmd_ldb_message_sort(msg, ac->schema);
640
641         ret = ldb_build_add_req(&down_req, ldb, ac,
642                                 msg,
643                                 req->controls,
644                                 ac, replmd_op_callback,
645                                 req);
646         if (ret != LDB_SUCCESS) {
647                 talloc_free(ac);
648                 return ret;
649         }
650
651         /* if a control is there remove if from the modified request */
652         if (control && !save_controls(control, down_req, &saved_controls)) {
653                 talloc_free(ac);
654                 return LDB_ERR_OPERATIONS_ERROR;
655         }
656
657         /* go on with the call chain */
658         return ldb_next_request(module, down_req);
659 }
660
661
662 /*
663  * update the replPropertyMetaData for one element
664  */
665 static int replmd_update_rpmd_element(struct ldb_context *ldb, 
666                                       struct ldb_message *msg,
667                                       struct ldb_message_element *el,
668                                       struct replPropertyMetaDataBlob *omd,
669                                       const struct dsdb_schema *schema,
670                                       uint64_t *seq_num,
671                                       const struct GUID *our_invocation_id,
672                                       NTTIME now)
673 {
674         int i;
675         const struct dsdb_attribute *a;
676         struct replPropertyMetaData1 *md1;
677
678         a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
679         if (a == NULL) {
680                 DEBUG(0,(__location__ ": Unable to find attribute %s in schema\n",
681                          el->name));
682                 return LDB_ERR_OPERATIONS_ERROR;
683         }
684
685         if ((a->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (a->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
686                 return LDB_SUCCESS;
687         }
688
689         for (i=0; i<omd->ctr.ctr1.count; i++) {
690                 if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) break;
691         }
692         if (i == omd->ctr.ctr1.count) {
693                 /* we need to add a new one */
694                 omd->ctr.ctr1.array = talloc_realloc(msg, omd->ctr.ctr1.array, 
695                                                      struct replPropertyMetaData1, omd->ctr.ctr1.count+1);
696                 if (omd->ctr.ctr1.array == NULL) {
697                         ldb_oom(ldb);
698                         return LDB_ERR_OPERATIONS_ERROR;
699                 }
700                 omd->ctr.ctr1.count++;
701                 ZERO_STRUCT(omd->ctr.ctr1.array[i]);
702         }
703
704         /* Get a new sequence number from the backend. We only do this
705          * if we have a change that requires a new
706          * replPropertyMetaData element 
707          */
708         if (*seq_num == 0) {
709                 int ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num);
710                 if (ret != LDB_SUCCESS) {
711                         return LDB_ERR_OPERATIONS_ERROR;
712                 }
713         }
714
715         md1 = &omd->ctr.ctr1.array[i];
716         md1->version++;
717         md1->attid                     = a->attributeID_id;
718         md1->originating_change_time   = now;
719         md1->originating_invocation_id = *our_invocation_id;
720         md1->originating_usn           = *seq_num;
721         md1->local_usn                 = *seq_num;
722         
723         return LDB_SUCCESS;
724 }
725
726 /*
727  * update the replPropertyMetaData object each time we modify an
728  * object. This is needed for DRS replication, as the merge on the
729  * client is based on this object 
730  */
731 static int replmd_update_rpmd(struct ldb_module *module, 
732                               const struct dsdb_schema *schema, 
733                               struct ldb_message *msg, uint64_t *seq_num)
734 {
735         const struct ldb_val *omd_value;
736         enum ndr_err_code ndr_err;
737         struct replPropertyMetaDataBlob omd;
738         int i;
739         time_t t = time(NULL);
740         NTTIME now;
741         const struct GUID *our_invocation_id;
742         int ret;
743         const char *attrs[] = { "replPropertyMetaData" , NULL };
744         struct ldb_result *res;
745         struct ldb_context *ldb;
746
747         ldb = ldb_module_get_ctx(module);
748
749         our_invocation_id = samdb_ntds_invocation_id(ldb);
750         if (!our_invocation_id) {
751                 /* this happens during an initial vampire while
752                    updating the schema */
753                 DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
754                 return LDB_SUCCESS;
755         }
756
757         unix_to_nt_time(&now, t);
758
759         /* search for the existing replPropertyMetaDataBlob */
760         ret = dsdb_search_dn_with_deleted(ldb, msg, &res, msg->dn, attrs);
761         if (ret != LDB_SUCCESS || res->count != 1) {
762                 DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
763                          ldb_dn_get_linearized(msg->dn)));
764                 return LDB_ERR_OPERATIONS_ERROR;
765         }
766                 
767
768         omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
769         if (!omd_value) {
770                 DEBUG(0,(__location__ ": Object %s does not have a replPropertyMetaData attribute\n",
771                          ldb_dn_get_linearized(msg->dn)));
772                 return LDB_ERR_OPERATIONS_ERROR;
773         }
774
775         ndr_err = ndr_pull_struct_blob(omd_value, msg,
776                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
777                                        (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
778         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
779                 DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
780                          ldb_dn_get_linearized(msg->dn)));
781                 return LDB_ERR_OPERATIONS_ERROR;
782         }
783
784         if (omd.version != 1) {
785                 DEBUG(0,(__location__ ": bad version %u in replPropertyMetaData for %s\n",
786                          omd.version, ldb_dn_get_linearized(msg->dn)));
787                 return LDB_ERR_OPERATIONS_ERROR;
788         }
789
790         for (i=0; i<msg->num_elements; i++) {
791                 ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], &omd, schema, seq_num, 
792                                                  our_invocation_id, now);
793                 if (ret != LDB_SUCCESS) {
794                         return ret;
795                 }
796         }
797
798         /*
799          * replmd_update_rpmd_element has done an update if the
800          * seq_num is set
801          */
802         if (*seq_num != 0) {
803                 struct ldb_val *md_value;
804                 struct ldb_message_element *el;
805
806                 md_value = talloc(msg, struct ldb_val);
807                 if (md_value == NULL) {
808                         ldb_oom(ldb);
809                         return LDB_ERR_OPERATIONS_ERROR;
810                 }
811
812                 ret = replmd_replPropertyMetaDataCtr1_sort(&omd.ctr.ctr1, schema, msg->dn);
813                 if (ret != LDB_SUCCESS) {
814                         return ret;
815                 }
816
817                 ndr_err = ndr_push_struct_blob(md_value, msg, 
818                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
819                                                &omd,
820                                                (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
821                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
822                         DEBUG(0,(__location__ ": Failed to marshall replPropertyMetaData for %s\n",
823                                  ldb_dn_get_linearized(msg->dn)));
824                         return LDB_ERR_OPERATIONS_ERROR;
825                 }
826
827                 ret = ldb_msg_add_empty(msg, "replPropertyMetaData", LDB_FLAG_MOD_REPLACE, &el);
828                 if (ret != LDB_SUCCESS) {
829                         DEBUG(0,(__location__ ": Failed to add updated replPropertyMetaData %s\n",
830                                  ldb_dn_get_linearized(msg->dn)));
831                         return ret;
832                 }
833
834                 el->num_values = 1;
835                 el->values = md_value;
836         }
837
838         return LDB_SUCCESS;     
839 }
840
841
842 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
843 {
844         struct ldb_context *ldb;
845         struct replmd_replicated_request *ac;
846         struct ldb_request *down_req;
847         struct ldb_message *msg;
848         struct ldb_result *res;
849         time_t t = time(NULL);
850         uint64_t seq_num = 0;
851         int ret;
852
853         /* do not manipulate our control entries */
854         if (ldb_dn_is_special(req->op.mod.message->dn)) {
855                 return ldb_next_request(module, req);
856         }
857
858         ldb = ldb_module_get_ctx(module);
859
860         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
861
862         ac = replmd_ctx_init(module, req);
863         if (!ac) {
864                 return LDB_ERR_OPERATIONS_ERROR;
865         }
866
867         /* we have to copy the message as the caller might have it as a const */
868         msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
869         if (msg == NULL) {
870                 ldb_oom(ldb);
871                 talloc_free(ac);
872                 return LDB_ERR_OPERATIONS_ERROR;
873         }
874
875         /* TODO:
876          * - give an error when a readonly attribute should
877          *   be modified
878          * - merge the changed into the old object
879          *   if the caller set values to the same value
880          *   ignore the attribute, return success when no
881          *   attribute was changed
882          */
883
884         ret = dsdb_search_dn_with_deleted(ldb, msg, &res, msg->dn, NULL);
885         if (ret != LDB_SUCCESS) {
886                 talloc_free(ac);
887                 return ret;
888         }
889
890         ret = replmd_update_rpmd(module, ac->schema, msg, &ac->seq_num);
891         if (ret != LDB_SUCCESS) {
892                 talloc_free(ac);
893                 return ret;
894         }
895
896         /* TODO:
897          * - replace the old object with the newly constructed one
898          */
899
900         ret = ldb_build_mod_req(&down_req, ldb, ac,
901                                 msg,
902                                 req->controls,
903                                 ac, replmd_op_callback,
904                                 req);
905         if (ret != LDB_SUCCESS) {
906                 talloc_free(ac);
907                 return ret;
908         }
909         talloc_steal(down_req, msg);
910
911         /* we only change whenChanged and uSNChanged if the seq_num
912            has changed */
913         if (seq_num != 0) {
914                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
915                         talloc_free(ac);
916                         return ret;
917                 }
918
919                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
920                         talloc_free(ac);
921                         return ret;
922                 }
923         }
924
925         /* go on with the call chain */
926         return ldb_next_request(module, down_req);
927 }
928
929 static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *ares);
930
931 /*
932   handle a rename request
933
934   On a rename we need to do an extra ldb_modify which sets the
935   whenChanged and uSNChanged attributes.  We do this in a callback after the success.
936  */
937 static int replmd_rename(struct ldb_module *module, struct ldb_request *req)
938 {
939         struct ldb_context *ldb;
940         struct replmd_replicated_request *ac;
941         int ret;
942         struct ldb_request *down_req;
943
944         /* do not manipulate our control entries */
945         if (ldb_dn_is_special(req->op.mod.message->dn)) {
946                 return ldb_next_request(module, req);
947         }
948
949         ldb = ldb_module_get_ctx(module);
950
951         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_rename\n");
952
953         ac = replmd_ctx_init(module, req);
954         if (!ac) {
955                 return LDB_ERR_OPERATIONS_ERROR;
956         }
957         ret = ldb_build_rename_req(&down_req, ldb, ac,
958                                    ac->req->op.rename.olddn,
959                                    ac->req->op.rename.newdn,
960                                    ac->req->controls,
961                                    ac, replmd_rename_callback,
962                                    ac->req);
963
964         if (ret != LDB_SUCCESS) {
965                 talloc_free(ac);
966                 return ret;
967         }
968
969         /* go on with the call chain */
970         return ldb_next_request(module, down_req);
971 }
972
973 /* After the rename is compleated, update the whenchanged etc */
974 static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *ares)
975 {
976         struct ldb_context *ldb;
977         struct replmd_replicated_request *ac;
978         struct ldb_request *down_req;
979         struct ldb_message *msg;
980         time_t t = time(NULL);
981         int ret;
982
983         ac = talloc_get_type(req->context, struct replmd_replicated_request);
984         ldb = ldb_module_get_ctx(ac->module);
985
986         if (ares->error != LDB_SUCCESS) {
987                 return ldb_module_done(ac->req, ares->controls,
988                                         ares->response, ares->error);
989         }
990
991         if (ares->type != LDB_REPLY_DONE) {
992                 ldb_set_errstring(ldb,
993                                   "invalid ldb_reply_type in callback");
994                 talloc_free(ares);
995                 return ldb_module_done(ac->req, NULL, NULL,
996                                         LDB_ERR_OPERATIONS_ERROR);
997         }
998
999         /* Get a sequence number from the backend */
1000         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ac->seq_num);
1001         if (ret != LDB_SUCCESS) {
1002                 return ret;
1003         }
1004
1005         /* TODO:
1006          * - replace the old object with the newly constructed one
1007          */
1008
1009         msg = ldb_msg_new(ac);
1010         if (msg == NULL) {
1011                 ldb_oom(ldb);
1012                 return LDB_ERR_OPERATIONS_ERROR;
1013         }
1014
1015         msg->dn = ac->req->op.rename.newdn;
1016
1017         ret = ldb_build_mod_req(&down_req, ldb, ac,
1018                                 msg,
1019                                 req->controls,
1020                                 ac, replmd_op_callback,
1021                                 req);
1022
1023         if (ret != LDB_SUCCESS) {
1024                 talloc_free(ac);
1025                 return ret;
1026         }
1027         talloc_steal(down_req, msg);
1028
1029         if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
1030                 talloc_free(ac);
1031                 return ret;
1032         }
1033         
1034         if (add_uint64_element(msg, "uSNChanged", ac->seq_num) != LDB_SUCCESS) {
1035                 talloc_free(ac);
1036                 return ret;
1037         }
1038
1039         /* go on with the call chain - do the modify after the rename */
1040         return ldb_next_request(ac->module, down_req);
1041 }
1042
1043
1044 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
1045 {
1046         return ret;
1047 }
1048
1049 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
1050 {
1051         int ret = LDB_ERR_OTHER;
1052         /* TODO: do some error mapping */
1053         return ret;
1054 }
1055
1056 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
1057 {
1058         struct ldb_context *ldb;
1059         struct ldb_request *change_req;
1060         enum ndr_err_code ndr_err;
1061         struct ldb_message *msg;
1062         struct replPropertyMetaDataBlob *md;
1063         struct ldb_val md_value;
1064         uint32_t i;
1065         int ret;
1066
1067         /*
1068          * TODO: check if the parent object exist
1069          */
1070
1071         /*
1072          * TODO: handle the conflict case where an object with the
1073          *       same name exist
1074          */
1075
1076         ldb = ldb_module_get_ctx(ar->module);
1077         msg = ar->objs->objects[ar->index_current].msg;
1078         md = ar->objs->objects[ar->index_current].meta_data;
1079
1080         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ar->seq_num);
1081         if (ret != LDB_SUCCESS) {
1082                 return replmd_replicated_request_error(ar, ret);
1083         }
1084
1085         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
1086         if (ret != LDB_SUCCESS) {
1087                 return replmd_replicated_request_error(ar, ret);
1088         }
1089
1090         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1091         if (ret != LDB_SUCCESS) {
1092                 return replmd_replicated_request_error(ar, ret);
1093         }
1094
1095         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", ar->seq_num);
1096         if (ret != LDB_SUCCESS) {
1097                 return replmd_replicated_request_error(ar, ret);
1098         }
1099
1100         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ar->seq_num);
1101         if (ret != LDB_SUCCESS) {
1102                 return replmd_replicated_request_error(ar, ret);
1103         }
1104
1105         /* remove any message elements that have zero values */
1106         for (i=0; i<msg->num_elements; i++) {
1107                 if (msg->elements[i].num_values == 0) {
1108                         DEBUG(4,(__location__ ": Removing attribute %s with num_values==0\n",
1109                                  msg->elements[i].name));
1110                         memmove(&msg->elements[i], 
1111                                 &msg->elements[i+1], 
1112                                 sizeof(msg->elements[i])*(msg->num_elements - (i+1)));
1113                         msg->num_elements--;
1114                         i--;
1115                 }
1116         }
1117         
1118         /*
1119          * the meta data array is already sorted by the caller
1120          */
1121         for (i=0; i < md->ctr.ctr1.count; i++) {
1122                 md->ctr.ctr1.array[i].local_usn = ar->seq_num;
1123         }
1124         ndr_err = ndr_push_struct_blob(&md_value, msg, 
1125                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1126                                        md,
1127                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1128         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1129                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1130                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1131         }
1132         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
1133         if (ret != LDB_SUCCESS) {
1134                 return replmd_replicated_request_error(ar, ret);
1135         }
1136
1137         replmd_ldb_message_sort(msg, ar->schema);
1138
1139         if (DEBUGLVL(4)) {
1140                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
1141                 DEBUG(4, ("DRS replication add message:\n%s\n", s));
1142                 talloc_free(s);
1143         }
1144
1145         ret = ldb_build_add_req(&change_req,
1146                                 ldb,
1147                                 ar,
1148                                 msg,
1149                                 ar->controls,
1150                                 ar,
1151                                 replmd_op_callback,
1152                                 ar->req);
1153         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1154
1155         return ldb_next_request(ar->module, change_req);
1156 }
1157
1158 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
1159                                                          struct replPropertyMetaData1 *m2)
1160 {
1161         int ret;
1162
1163         if (m1->version != m2->version) {
1164                 return m1->version - m2->version;
1165         }
1166
1167         if (m1->originating_change_time != m2->originating_change_time) {
1168                 return m1->originating_change_time - m2->originating_change_time;
1169         }
1170
1171         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
1172         if (ret != 0) {
1173                 return ret;
1174         }
1175
1176         return m1->originating_usn - m2->originating_usn;
1177 }
1178
1179 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
1180 {
1181         struct ldb_context *ldb;
1182         struct ldb_request *change_req;
1183         enum ndr_err_code ndr_err;
1184         struct ldb_message *msg;
1185         struct replPropertyMetaDataBlob *rmd;
1186         struct replPropertyMetaDataBlob omd;
1187         const struct ldb_val *omd_value;
1188         struct replPropertyMetaDataBlob nmd;
1189         struct ldb_val nmd_value;
1190         uint32_t i,j,ni=0;
1191         uint32_t removed_attrs = 0;
1192         int ret;
1193
1194         ldb = ldb_module_get_ctx(ar->module);
1195         msg = ar->objs->objects[ar->index_current].msg;
1196         rmd = ar->objs->objects[ar->index_current].meta_data;
1197         ZERO_STRUCT(omd);
1198         omd.version = 1;
1199
1200         /*
1201          * TODO: check repl data is correct after a rename
1202          */
1203         if (ldb_dn_compare(msg->dn, ar->search_msg->dn) != 0) {
1204                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_request rename %s => %s\n",
1205                           ldb_dn_get_linearized(ar->search_msg->dn),
1206                           ldb_dn_get_linearized(msg->dn));
1207                 if (ldb_rename(ldb, ar->search_msg->dn, msg->dn) != LDB_SUCCESS) {
1208                         ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_replicated_request rename %s => %s failed - %s\n",
1209                                   ldb_dn_get_linearized(ar->search_msg->dn),
1210                                   ldb_dn_get_linearized(msg->dn),
1211                                   ldb_errstring(ldb));
1212                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
1213                 }
1214         }
1215
1216         /* find existing meta data */
1217         omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
1218         if (omd_value) {
1219                 ndr_err = ndr_pull_struct_blob(omd_value, ar,
1220                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
1221                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
1222                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1223                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1224                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1225                 }
1226
1227                 if (omd.version != 1) {
1228                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1229                 }
1230         }
1231
1232         ZERO_STRUCT(nmd);
1233         nmd.version = 1;
1234         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
1235         nmd.ctr.ctr1.array = talloc_array(ar,
1236                                           struct replPropertyMetaData1,
1237                                           nmd.ctr.ctr1.count);
1238         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1239
1240         /* first copy the old meta data */
1241         for (i=0; i < omd.ctr.ctr1.count; i++) {
1242                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
1243                 ni++;
1244         }
1245
1246         /* now merge in the new meta data */
1247         for (i=0; i < rmd->ctr.ctr1.count; i++) {
1248                 bool found = false;
1249
1250                 for (j=0; j < ni; j++) {
1251                         int cmp;
1252
1253                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
1254                                 continue;
1255                         }
1256
1257                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
1258                                                                             &nmd.ctr.ctr1.array[j]);
1259                         if (cmp > 0) {
1260                                 /* replace the entry */
1261                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
1262                                 found = true;
1263                                 break;
1264                         }
1265
1266                         /* we don't want to apply this change so remove the attribute */
1267                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
1268                         removed_attrs++;
1269
1270                         found = true;
1271                         break;
1272                 }
1273
1274                 if (found) continue;
1275
1276                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
1277                 ni++;
1278         }
1279
1280         /*
1281          * finally correct the size of the meta_data array
1282          */
1283         nmd.ctr.ctr1.count = ni;
1284
1285         /*
1286          * the rdn attribute (the alias for the name attribute),
1287          * 'cn' for most objects is the last entry in the meta data array
1288          * we have stored
1289          *
1290          * sort the new meta data array
1291          */
1292         ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, ar->schema, msg->dn);
1293         if (ret != LDB_SUCCESS) {
1294                 return ret;
1295         }
1296
1297         /*
1298          * check if some replicated attributes left, otherwise skip the ldb_modify() call
1299          */
1300         if (msg->num_elements == 0) {
1301                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
1302                           ar->index_current);
1303
1304                 ar->index_current++;
1305                 return replmd_replicated_apply_next(ar);
1306         }
1307
1308         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
1309                   ar->index_current, msg->num_elements);
1310
1311         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ar->seq_num);
1312         if (ret != LDB_SUCCESS) {
1313                 return replmd_replicated_request_error(ar, ret);
1314         }
1315
1316         for (i=0; i<ni; i++) {
1317                 nmd.ctr.ctr1.array[i].local_usn = ar->seq_num;
1318         }
1319
1320         /* create the meta data value */
1321         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
1322                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1323                                        &nmd,
1324                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1325         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1326                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1327                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1328         }
1329
1330         /*
1331          * when we know that we'll modify the record, add the whenChanged, uSNChanged
1332          * and replPopertyMetaData attributes
1333          */
1334         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1335         if (ret != LDB_SUCCESS) {
1336                 return replmd_replicated_request_error(ar, ret);
1337         }
1338         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ar->seq_num);
1339         if (ret != LDB_SUCCESS) {
1340                 return replmd_replicated_request_error(ar, ret);
1341         }
1342         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
1343         if (ret != LDB_SUCCESS) {
1344                 return replmd_replicated_request_error(ar, ret);
1345         }
1346
1347         replmd_ldb_message_sort(msg, ar->schema);
1348
1349         /* we want to replace the old values */
1350         for (i=0; i < msg->num_elements; i++) {
1351                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
1352         }
1353
1354         if (DEBUGLVL(4)) {
1355                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1356                 DEBUG(4, ("DRS replication modify message:\n%s\n", s));
1357                 talloc_free(s);
1358         }
1359
1360         ret = ldb_build_mod_req(&change_req,
1361                                 ldb,
1362                                 ar,
1363                                 msg,
1364                                 ar->controls,
1365                                 ar,
1366                                 replmd_op_callback,
1367                                 ar->req);
1368         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1369
1370         return ldb_next_request(ar->module, change_req);
1371 }
1372
1373 static int replmd_replicated_apply_search_callback(struct ldb_request *req,
1374                                                    struct ldb_reply *ares)
1375 {
1376         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1377                                                struct replmd_replicated_request);
1378         int ret;
1379
1380         if (!ares) {
1381                 return ldb_module_done(ar->req, NULL, NULL,
1382                                         LDB_ERR_OPERATIONS_ERROR);
1383         }
1384         if (ares->error != LDB_SUCCESS &&
1385             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1386                 return ldb_module_done(ar->req, ares->controls,
1387                                         ares->response, ares->error);
1388         }
1389
1390         switch (ares->type) {
1391         case LDB_REPLY_ENTRY:
1392                 ar->search_msg = talloc_steal(ar, ares->message);
1393                 break;
1394
1395         case LDB_REPLY_REFERRAL:
1396                 /* we ignore referrals */
1397                 break;
1398
1399         case LDB_REPLY_DONE:
1400                 if (ar->search_msg != NULL) {
1401                         ret = replmd_replicated_apply_merge(ar);
1402                 } else {
1403                         ret = replmd_replicated_apply_add(ar);
1404                 }
1405                 if (ret != LDB_SUCCESS) {
1406                         return ldb_module_done(ar->req, NULL, NULL, ret);
1407                 }
1408         }
1409
1410         talloc_free(ares);
1411         return LDB_SUCCESS;
1412 }
1413
1414 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
1415
1416 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
1417 {
1418         struct ldb_context *ldb;
1419         int ret;
1420         char *tmp_str;
1421         char *filter;
1422         struct ldb_request *search_req;
1423         struct ldb_search_options_control *options;
1424
1425         if (ar->index_current >= ar->objs->num_objects) {
1426                 /* done with it, go to next stage */
1427                 return replmd_replicated_uptodate_vector(ar);
1428         }
1429
1430         ldb = ldb_module_get_ctx(ar->module);
1431         ar->search_msg = NULL;
1432
1433         tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].guid_value);
1434         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1435
1436         filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
1437         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1438         talloc_free(tmp_str);
1439
1440         ret = ldb_build_search_req(&search_req,
1441                                    ldb,
1442                                    ar,
1443                                    NULL,
1444                                    LDB_SCOPE_SUBTREE,
1445                                    filter,
1446                                    NULL,
1447                                    NULL,
1448                                    ar,
1449                                    replmd_replicated_apply_search_callback,
1450                                    ar->req);
1451
1452         ret = ldb_request_add_control(search_req, LDB_CONTROL_SHOW_DELETED_OID, true, NULL);
1453         if (ret != LDB_SUCCESS) {
1454                 return ret;
1455         }
1456
1457         /* we need to cope with cross-partition links, so search for
1458            the GUID over all partitions */
1459         options = talloc(search_req, struct ldb_search_options_control);
1460         if (options == NULL) {
1461                 DEBUG(0, (__location__ ": out of memory\n"));
1462                 return LDB_ERR_OPERATIONS_ERROR;
1463         }
1464         options->search_options = LDB_SEARCH_OPTION_PHANTOM_ROOT;
1465
1466         ret = ldb_request_add_control(search_req,
1467                                       LDB_CONTROL_SEARCH_OPTIONS_OID,
1468                                       true, options);
1469         if (ret != LDB_SUCCESS) {
1470                 return ret;
1471         }
1472
1473         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1474
1475         return ldb_next_request(ar->module, search_req);
1476 }
1477
1478 static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
1479                                                       struct ldb_reply *ares)
1480 {
1481         struct ldb_context *ldb;
1482         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1483                                                struct replmd_replicated_request);
1484         ldb = ldb_module_get_ctx(ar->module);
1485
1486         if (!ares) {
1487                 return ldb_module_done(ar->req, NULL, NULL,
1488                                         LDB_ERR_OPERATIONS_ERROR);
1489         }
1490         if (ares->error != LDB_SUCCESS) {
1491                 return ldb_module_done(ar->req, ares->controls,
1492                                         ares->response, ares->error);
1493         }
1494
1495         if (ares->type != LDB_REPLY_DONE) {
1496                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1497                 return ldb_module_done(ar->req, NULL, NULL,
1498                                         LDB_ERR_OPERATIONS_ERROR);
1499         }
1500
1501         talloc_free(ares);
1502
1503         return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
1504 }
1505
1506 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1507 {
1508         struct ldb_context *ldb;
1509         struct ldb_request *change_req;
1510         enum ndr_err_code ndr_err;
1511         struct ldb_message *msg;
1512         struct replUpToDateVectorBlob ouv;
1513         const struct ldb_val *ouv_value;
1514         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1515         struct replUpToDateVectorBlob nuv;
1516         struct ldb_val nuv_value;
1517         struct ldb_message_element *nuv_el = NULL;
1518         const struct GUID *our_invocation_id;
1519         struct ldb_message_element *orf_el = NULL;
1520         struct repsFromToBlob nrf;
1521         struct ldb_val *nrf_value = NULL;
1522         struct ldb_message_element *nrf_el = NULL;
1523         uint32_t i,j,ni=0;
1524         bool found = false;
1525         time_t t = time(NULL);
1526         NTTIME now;
1527         int ret;
1528
1529         ldb = ldb_module_get_ctx(ar->module);
1530         ruv = ar->objs->uptodateness_vector;
1531         ZERO_STRUCT(ouv);
1532         ouv.version = 2;
1533         ZERO_STRUCT(nuv);
1534         nuv.version = 2;
1535
1536         unix_to_nt_time(&now, t);
1537
1538         /*
1539          * first create the new replUpToDateVector
1540          */
1541         ouv_value = ldb_msg_find_ldb_val(ar->search_msg, "replUpToDateVector");
1542         if (ouv_value) {
1543                 ndr_err = ndr_pull_struct_blob(ouv_value, ar,
1544                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &ouv,
1545                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1546                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1547                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1548                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1549                 }
1550
1551                 if (ouv.version != 2) {
1552                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1553                 }
1554         }
1555
1556         /*
1557          * the new uptodateness vector will at least
1558          * contain 1 entry, one for the source_dsa
1559          *
1560          * plus optional values from our old vector and the one from the source_dsa
1561          */
1562         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
1563         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1564         nuv.ctr.ctr2.cursors = talloc_array(ar,
1565                                             struct drsuapi_DsReplicaCursor2,
1566                                             nuv.ctr.ctr2.count);
1567         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1568
1569         /* first copy the old vector */
1570         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1571                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1572                 ni++;
1573         }
1574
1575         /* get our invocation_id if we have one already attached to the ldb */
1576         our_invocation_id = samdb_ntds_invocation_id(ldb);
1577
1578         /* merge in the source_dsa vector is available */
1579         for (i=0; (ruv && i < ruv->count); i++) {
1580                 found = false;
1581
1582                 if (our_invocation_id &&
1583                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1584                                our_invocation_id)) {
1585                         continue;
1586                 }
1587
1588                 for (j=0; j < ni; j++) {
1589                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1590                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1591                                 continue;
1592                         }
1593
1594                         found = true;
1595
1596                         /*
1597                          * we update only the highest_usn and not the latest_sync_success time,
1598                          * because the last success stands for direct replication
1599                          */
1600                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1601                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1602                         }
1603                         break;                  
1604                 }
1605
1606                 if (found) continue;
1607
1608                 /* if it's not there yet, add it */
1609                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1610                 ni++;
1611         }
1612
1613         /*
1614          * merge in the current highwatermark for the source_dsa
1615          */
1616         found = false;
1617         for (j=0; j < ni; j++) {
1618                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1619                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1620                         continue;
1621                 }
1622
1623                 found = true;
1624
1625                 /*
1626                  * here we update the highest_usn and last_sync_success time
1627                  * because we're directly replicating from the source_dsa
1628                  *
1629                  * and use the tmp_highest_usn because this is what we have just applied
1630                  * to our ldb
1631                  */
1632                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1633                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1634                 break;
1635         }
1636         if (!found) {
1637                 /*
1638                  * here we update the highest_usn and last_sync_success time
1639                  * because we're directly replicating from the source_dsa
1640                  *
1641                  * and use the tmp_highest_usn because this is what we have just applied
1642                  * to our ldb
1643                  */
1644                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1645                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1646                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1647                 ni++;
1648         }
1649
1650         /*
1651          * finally correct the size of the cursors array
1652          */
1653         nuv.ctr.ctr2.count = ni;
1654
1655         /*
1656          * sort the cursors
1657          */
1658         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1659               sizeof(struct drsuapi_DsReplicaCursor2),
1660               (comparison_fn_t)drsuapi_DsReplicaCursor2_compare);
1661
1662         /*
1663          * create the change ldb_message
1664          */
1665         msg = ldb_msg_new(ar);
1666         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1667         msg->dn = ar->search_msg->dn;
1668
1669         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
1670                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1671                                        &nuv,
1672                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1673         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1674                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1675                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1676         }
1677         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1678         if (ret != LDB_SUCCESS) {
1679                 return replmd_replicated_request_error(ar, ret);
1680         }
1681         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1682
1683         /*
1684          * now create the new repsFrom value from the given repsFromTo1 structure
1685          */
1686         ZERO_STRUCT(nrf);
1687         nrf.version                                     = 1;
1688         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1689         /* and fix some values... */
1690         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1691         nrf.ctr.ctr1.last_success                       = now;
1692         nrf.ctr.ctr1.last_attempt                       = now;
1693         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1694         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1695
1696         /*
1697          * first see if we already have a repsFrom value for the current source dsa
1698          * if so we'll later replace this value
1699          */
1700         orf_el = ldb_msg_find_element(ar->search_msg, "repsFrom");
1701         if (orf_el) {
1702                 for (i=0; i < orf_el->num_values; i++) {
1703                         struct repsFromToBlob *trf;
1704
1705                         trf = talloc(ar, struct repsFromToBlob);
1706                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1707
1708                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), trf,
1709                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1710                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1711                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1712                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1713                         }
1714
1715                         if (trf->version != 1) {
1716                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1717                         }
1718
1719                         /*
1720                          * we compare the source dsa objectGUID not the invocation_id
1721                          * because we want only one repsFrom value per source dsa
1722                          * and when the invocation_id of the source dsa has changed we don't need 
1723                          * the old repsFrom with the old invocation_id
1724                          */
1725                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1726                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1727                                 talloc_free(trf);
1728                                 continue;
1729                         }
1730
1731                         talloc_free(trf);
1732                         nrf_value = &orf_el->values[i];
1733                         break;
1734                 }
1735
1736                 /*
1737                  * copy over all old values to the new ldb_message
1738                  */
1739                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1740                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1741                 *nrf_el = *orf_el;
1742         }
1743
1744         /*
1745          * if we haven't found an old repsFrom value for the current source dsa
1746          * we'll add a new value
1747          */
1748         if (!nrf_value) {
1749                 struct ldb_val zero_value;
1750                 ZERO_STRUCT(zero_value);
1751                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1752                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1753
1754                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1755         }
1756
1757         /* we now fill the value which is already attached to ldb_message */
1758         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
1759                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1760                                        &nrf,
1761                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1762         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1763                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1764                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1765         }
1766
1767         /* 
1768          * the ldb_message_element for the attribute, has all the old values and the new one
1769          * so we'll replace the whole attribute with all values
1770          */
1771         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1772
1773         if (DEBUGLVL(4)) {
1774                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1775                 DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
1776                 talloc_free(s);
1777         }
1778
1779         /* prepare the ldb_modify() request */
1780         ret = ldb_build_mod_req(&change_req,
1781                                 ldb,
1782                                 ar,
1783                                 msg,
1784                                 ar->controls,
1785                                 ar,
1786                                 replmd_replicated_uptodate_modify_callback,
1787                                 ar->req);
1788         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1789
1790         return ldb_next_request(ar->module, change_req);
1791 }
1792
1793 static int replmd_replicated_uptodate_search_callback(struct ldb_request *req,
1794                                                       struct ldb_reply *ares)
1795 {
1796         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1797                                                struct replmd_replicated_request);
1798         int ret;
1799
1800         if (!ares) {
1801                 return ldb_module_done(ar->req, NULL, NULL,
1802                                         LDB_ERR_OPERATIONS_ERROR);
1803         }
1804         if (ares->error != LDB_SUCCESS &&
1805             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1806                 return ldb_module_done(ar->req, ares->controls,
1807                                         ares->response, ares->error);
1808         }
1809
1810         switch (ares->type) {
1811         case LDB_REPLY_ENTRY:
1812                 ar->search_msg = talloc_steal(ar, ares->message);
1813                 break;
1814
1815         case LDB_REPLY_REFERRAL:
1816                 /* we ignore referrals */
1817                 break;
1818
1819         case LDB_REPLY_DONE:
1820                 if (ar->search_msg == NULL) {
1821                         ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1822                 } else {
1823                         ret = replmd_replicated_uptodate_modify(ar);
1824                 }
1825                 if (ret != LDB_SUCCESS) {
1826                         return ldb_module_done(ar->req, NULL, NULL, ret);
1827                 }
1828         }
1829
1830         talloc_free(ares);
1831         return LDB_SUCCESS;
1832 }
1833
1834
1835 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1836 {
1837         struct ldb_context *ldb;
1838         int ret;
1839         static const char *attrs[] = {
1840                 "replUpToDateVector",
1841                 "repsFrom",
1842                 NULL
1843         };
1844         struct ldb_request *search_req;
1845
1846         ldb = ldb_module_get_ctx(ar->module);
1847         ar->search_msg = NULL;
1848
1849         ret = ldb_build_search_req(&search_req,
1850                                    ldb,
1851                                    ar,
1852                                    ar->objs->partition_dn,
1853                                    LDB_SCOPE_BASE,
1854                                    "(objectClass=*)",
1855                                    attrs,
1856                                    NULL,
1857                                    ar,
1858                                    replmd_replicated_uptodate_search_callback,
1859                                    ar->req);
1860         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1861
1862         return ldb_next_request(ar->module, search_req);
1863 }
1864
1865
1866
1867 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1868 {
1869         struct ldb_context *ldb;
1870         struct dsdb_extended_replicated_objects *objs;
1871         struct replmd_replicated_request *ar;
1872         struct ldb_control **ctrls;
1873         int ret, i;
1874         struct replmd_private *replmd_private = 
1875                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1876
1877         ldb = ldb_module_get_ctx(module);
1878
1879         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1880
1881         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1882         if (!objs) {
1883                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1884                 return LDB_ERR_PROTOCOL_ERROR;
1885         }
1886
1887         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1888                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1889                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1890                 return LDB_ERR_PROTOCOL_ERROR;
1891         }
1892
1893         ar = replmd_ctx_init(module, req);
1894         if (!ar)
1895                 return LDB_ERR_OPERATIONS_ERROR;
1896
1897         /* Set the flags to have the replmd_op_callback run over the full set of objects */
1898         ar->apply_mode = true;
1899         ar->objs = objs;
1900         ar->schema = dsdb_get_schema(ldb);
1901         if (!ar->schema) {
1902                 ldb_debug_set(ldb, LDB_DEBUG_FATAL, "replmd_ctx_init: no loaded schema found\n");
1903                 talloc_free(ar);
1904                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
1905                 return LDB_ERR_CONSTRAINT_VIOLATION;
1906         }
1907
1908         ctrls = req->controls;
1909
1910         if (req->controls) {
1911                 req->controls = talloc_memdup(ar, req->controls,
1912                                               talloc_get_size(req->controls));
1913                 if (!req->controls) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1914         }
1915
1916         ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, NULL);
1917         if (ret != LDB_SUCCESS) {
1918                 return ret;
1919         }
1920
1921         ar->controls = req->controls;
1922         req->controls = ctrls;
1923
1924         DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
1925
1926         /* save away the linked attributes for the end of the
1927            transaction */
1928         for (i=0; i<ar->objs->linked_attributes_count; i++) {
1929                 struct la_entry *la_entry;
1930
1931                 if (replmd_private->la_ctx == NULL) {
1932                         replmd_private->la_ctx = talloc_new(replmd_private);
1933                 }
1934                 la_entry = talloc(replmd_private->la_ctx, struct la_entry);
1935                 if (la_entry == NULL) {
1936                         ldb_oom(ldb);
1937                         return LDB_ERR_OPERATIONS_ERROR;
1938                 }
1939                 la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
1940                 if (la_entry->la == NULL) {
1941                         talloc_free(la_entry);
1942                         ldb_oom(ldb);
1943                         return LDB_ERR_OPERATIONS_ERROR;
1944                 }
1945                 *la_entry->la = ar->objs->linked_attributes[i];
1946
1947                 /* we need to steal the non-scalars so they stay
1948                    around until the end of the transaction */
1949                 talloc_steal(la_entry->la, la_entry->la->identifier);
1950                 talloc_steal(la_entry->la, la_entry->la->value.blob);
1951
1952                 DLIST_ADD(replmd_private->la_list, la_entry);
1953         }
1954
1955         return replmd_replicated_apply_next(ar);
1956 }
1957
1958 /*
1959   process one linked attribute structure
1960  */
1961 static int replmd_process_linked_attribute(struct ldb_module *module,
1962                                            struct la_entry *la_entry)
1963 {                                          
1964         struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
1965         struct ldb_context *ldb = ldb_module_get_ctx(module);
1966         struct dsdb_schema *schema = dsdb_get_schema(ldb);
1967         struct drsuapi_DsReplicaObjectIdentifier3 target;
1968         struct ldb_message *msg;
1969         TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
1970         struct ldb_request *mod_req;
1971         int ret;
1972         const struct dsdb_attribute *attr;
1973         struct ldb_dn *target_dn;
1974         struct dsdb_dn *dsdb_dn;
1975         uint64_t seq_num = 0;
1976         struct drsuapi_DsReplicaAttribute drs;
1977         struct drsuapi_DsAttributeValue val;
1978         struct ldb_message_element el;
1979         const struct ldb_val *guid;
1980         WERROR status;
1981
1982         drs.value_ctr.num_values = 1;
1983         drs.value_ctr.values = &val;
1984         val.blob = la->value.blob;
1985
1986 /*
1987 linked_attributes[0]:                                                     
1988      &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute 
1989         identifier               : *                                      
1990             identifier: struct drsuapi_DsReplicaObjectIdentifier          
1991                 __ndr_size               : 0x0000003a (58)                
1992                 __ndr_size_sid           : 0x00000000 (0)                 
1993                 guid                     : 8e95b6a9-13dd-4158-89db-3220a5be5cc7
1994                 sid                      : S-0-0                               
1995                 __ndr_size_dn            : 0x00000000 (0)                      
1996                 dn                       : ''                                  
1997         attid                    : DRSUAPI_ATTRIBUTE_member (0x1F)             
1998         value: struct drsuapi_DsAttributeValue                                 
1999             __ndr_size               : 0x0000007e (126)                        
2000             blob                     : *                                       
2001                 blob                     : DATA_BLOB length=126                
2002         flags                    : 0x00000001 (1)                              
2003                1: DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE                      
2004         originating_add_time     : Wed Sep  2 22:20:01 2009 EST                
2005         meta_data: struct drsuapi_DsReplicaMetaData                            
2006             version                  : 0x00000015 (21)                         
2007             originating_change_time  : Wed Sep  2 23:39:07 2009 EST            
2008             originating_invocation_id: 794640f3-18cf-40ee-a211-a93992b67a64    
2009             originating_usn          : 0x000000000001e19c (123292)             
2010
2011 (for cases where the link is to a normal DN)
2012      &target: struct drsuapi_DsReplicaObjectIdentifier3                        
2013         __ndr_size               : 0x0000007e (126)                            
2014         __ndr_size_sid           : 0x0000001c (28)                             
2015         guid                     : 7639e594-db75-4086-b0d4-67890ae46031        
2016         sid                      : S-1-5-21-2848215498-2472035911-1947525656-19924
2017         __ndr_size_dn            : 0x00000022 (34)                                
2018         dn                       : 'CN=UOne,OU=TestOU,DC=vsofs8,DC=com'           
2019  */
2020         
2021         /* find the attribute being modified */
2022         attr = dsdb_attribute_by_attributeID_id(schema, la->attid);
2023         if (attr == NULL) {
2024                 DEBUG(0, (__location__ ": Unable to find attributeID 0x%x\n", la->attid));
2025                 talloc_free(tmp_ctx);
2026                 return LDB_ERR_OPERATIONS_ERROR;
2027         }
2028
2029         status = attr->syntax->drsuapi_to_ldb(ldb, schema, attr, &drs, tmp_ctx, &el);
2030
2031         /* construct a modify request for this attribute change */
2032         msg = ldb_msg_new(tmp_ctx);
2033         if (!msg) {
2034                 ldb_oom(ldb);
2035                 talloc_free(tmp_ctx);
2036                 return LDB_ERR_OPERATIONS_ERROR;
2037         }
2038
2039         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, 
2040                                    GUID_string(tmp_ctx, &la->identifier->guid), &msg->dn);
2041         if (ret != LDB_SUCCESS) {
2042                 talloc_free(tmp_ctx);
2043                 return ret;
2044         }
2045
2046         el.name = attr->lDAPDisplayName;
2047         if (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) {
2048                 ret = ldb_msg_add(msg, &el, LDB_FLAG_MOD_ADD);
2049         } else {
2050                 ret = ldb_msg_add(msg, &el, LDB_FLAG_MOD_DELETE);
2051         }
2052         if (ret != LDB_SUCCESS) {
2053                 talloc_free(tmp_ctx);
2054                 return ret;
2055         }
2056
2057         dsdb_dn = dsdb_dn_parse(tmp_ctx, ldb, &el.values[0], attr->syntax->ldap_oid);
2058         if (!dsdb_dn) {
2059                 DEBUG(0,(__location__ ": Failed to parse just-generated DN\n"));
2060                 talloc_free(tmp_ctx);
2061                 return LDB_ERR_INVALID_DN_SYNTAX;
2062         }
2063
2064         guid = ldb_dn_get_extended_component(dsdb_dn->dn, "GUID");
2065         if (!guid) {
2066                 DEBUG(0,(__location__ ": Failed to parse GUID from just-generated DN\n"));
2067                 talloc_free(tmp_ctx);
2068                 return LDB_ERR_INVALID_DN_SYNTAX;
2069         }
2070
2071         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, ldb_binary_encode(tmp_ctx, *guid), &target_dn);
2072         if (ret != LDB_SUCCESS) {
2073                 /* If this proves to be a problem in the future, then
2074                  * just remove the return - perhaps we can just use
2075                  * the details the replication peer supplied */
2076
2077                 DEBUG(0,(__location__ ": Failed to map GUID %s to DN\n", GUID_string(tmp_ctx, &target.guid)));
2078                 talloc_free(tmp_ctx);
2079                 return LDB_ERR_OPERATIONS_ERROR;
2080         } else {
2081
2082                 /* Now update with full DN we just found in the DB (including extended components) */
2083                 dsdb_dn->dn = target_dn;
2084                 /* Now make a linearized version, using the original binary components (if any) */
2085                 el.values[0] = data_blob_string_const(dsdb_dn_get_extended_linearized(tmp_ctx, dsdb_dn, 1));
2086         }
2087
2088         ret = replmd_update_rpmd(module, schema, msg, &seq_num);
2089         if (ret != LDB_SUCCESS) {
2090                 talloc_free(tmp_ctx);
2091                 return ret;
2092         }
2093
2094         /* we only change whenChanged and uSNChanged if the seq_num
2095            has changed */
2096         if (seq_num != 0) {
2097                 time_t t = time(NULL);
2098
2099                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
2100                         talloc_free(tmp_ctx);
2101                         return LDB_ERR_OPERATIONS_ERROR;
2102                 }
2103
2104                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
2105                         talloc_free(tmp_ctx);
2106                         return LDB_ERR_OPERATIONS_ERROR;
2107                 }
2108         }
2109
2110         ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2111                                 msg,
2112                                 NULL,
2113                                 NULL, 
2114                                 ldb_op_default_callback,
2115                                 NULL);
2116         if (ret != LDB_SUCCESS) {
2117                 talloc_free(tmp_ctx);
2118                 return ret;
2119         }
2120         talloc_steal(mod_req, msg);
2121
2122         if (DEBUGLVL(4)) {
2123                 DEBUG(4,("Applying DRS linked attribute change:\n%s\n",
2124                          ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg)));
2125         }
2126
2127         /* Run the new request */
2128         ret = ldb_next_request(module, mod_req);
2129
2130         /* we need to wait for this to finish, as we are being called
2131            from the synchronous end_transaction hook of this module */
2132         if (ret == LDB_SUCCESS) {
2133                 ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
2134         }
2135
2136         if (ret == LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
2137                 /* the link destination exists, we need to update it
2138                  * by deleting the old one for the same DN then adding
2139                  * the new one */
2140                 msg->elements = talloc_realloc(msg, msg->elements,
2141                                                struct ldb_message_element,
2142                                                msg->num_elements+1);
2143                 if (msg->elements == NULL) {
2144                         ldb_oom(ldb);
2145                         talloc_free(tmp_ctx);
2146                         return LDB_ERR_OPERATIONS_ERROR;
2147                 }
2148                 /* this relies on the backend matching the old entry
2149                    only by the DN portion of the extended DN */
2150                 msg->elements[1] = msg->elements[0];
2151                 msg->elements[0].flags = LDB_FLAG_MOD_DELETE;
2152                 msg->num_elements++;
2153
2154                 ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2155                                         msg,
2156                                         NULL,
2157                                         NULL, 
2158                                         ldb_op_default_callback,
2159                                         NULL);
2160                 if (ret != LDB_SUCCESS) {
2161                         talloc_free(tmp_ctx);
2162                         return ret;
2163                 }
2164
2165                 /* Run the new request */
2166                 ret = ldb_next_request(module, mod_req);
2167                 
2168                 if (ret == LDB_SUCCESS) {
2169                         ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
2170                 }
2171         }
2172
2173         if (ret != LDB_SUCCESS) {
2174                 ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s' %s\n",
2175                           ldb_errstring(ldb),
2176                           ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
2177                 ret = LDB_SUCCESS;
2178         }
2179         
2180         talloc_free(tmp_ctx);
2181
2182         return ret;     
2183 }
2184
2185 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
2186 {
2187         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
2188                 return replmd_extended_replicated_objects(module, req);
2189         }
2190
2191         return ldb_next_request(module, req);
2192 }
2193
2194
2195 /*
2196   we hook into the transaction operations to allow us to 
2197   perform the linked attribute updates at the end of the whole
2198   transaction. This allows a forward linked attribute to be created
2199   before the object is created. During a vampire, w2k8 sends us linked
2200   attributes before the objects they are part of.
2201  */
2202 static int replmd_start_transaction(struct ldb_module *module)
2203 {
2204         /* create our private structure for this transaction */
2205         struct replmd_private *replmd_private = talloc_get_type(ldb_module_get_private(module),
2206                                                                 struct replmd_private);
2207         talloc_free(replmd_private->la_ctx);
2208         replmd_private->la_list = NULL;
2209         replmd_private->la_ctx = NULL;
2210
2211         /* free any leftover mod_usn records from cancelled
2212            transactions */
2213         while (replmd_private->ncs) {
2214                 struct nc_entry *e = replmd_private->ncs;
2215                 DLIST_REMOVE(replmd_private->ncs, e);
2216                 talloc_free(e);
2217         }
2218
2219         return ldb_next_start_trans(module);
2220 }
2221
2222 /*
2223   on prepare commit we loop over our queued la_context structures and
2224   apply each of them  
2225  */
2226 static int replmd_prepare_commit(struct ldb_module *module)
2227 {
2228         struct replmd_private *replmd_private = 
2229                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2230         struct la_entry *la, *prev;
2231         int ret;
2232
2233         /* walk the list backwards, to do the first entry first, as we
2234          * added the entries with DLIST_ADD() which puts them at the
2235          * start of the list */
2236         for (la = replmd_private->la_list; la && la->next; la=la->next) ;
2237
2238         for (; la; la=prev) {
2239                 prev = la->prev;
2240                 DLIST_REMOVE(replmd_private->la_list, la);
2241                 ret = replmd_process_linked_attribute(module, la);
2242                 if (ret != LDB_SUCCESS) {
2243                         talloc_free(replmd_private->la_ctx);
2244                         replmd_private->la_list = NULL;
2245                         replmd_private->la_ctx = NULL;
2246                         return ret;
2247                 }
2248         }
2249
2250         talloc_free(replmd_private->la_ctx);
2251         replmd_private->la_list = NULL;
2252         replmd_private->la_ctx = NULL;
2253
2254         /* possibly change @REPLCHANGED */
2255         ret = replmd_notify_store(module);
2256         if (ret != LDB_SUCCESS) {
2257                 return ret;
2258         }
2259         
2260         return ldb_next_prepare_commit(module);
2261 }
2262
2263 static int replmd_del_transaction(struct ldb_module *module)
2264 {
2265         struct replmd_private *replmd_private = 
2266                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2267         talloc_free(replmd_private->la_ctx);
2268         replmd_private->la_list = NULL;
2269         replmd_private->la_ctx = NULL;
2270         return ldb_next_del_trans(module);
2271 }
2272
2273
2274 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
2275         .name          = "repl_meta_data",
2276         .init_context      = replmd_init,
2277         .add               = replmd_add,
2278         .modify            = replmd_modify,
2279         .rename            = replmd_rename,
2280         .extended          = replmd_extended,
2281         .start_transaction = replmd_start_transaction,
2282         .prepare_commit    = replmd_prepare_commit,
2283         .del_transaction   = replmd_del_transaction,
2284 };