s4-ldb: sort replPropertyMetaData by attid
[amitay/samba.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /*
24  *  Name: ldb
25  *
26  *  Component: ldb repl_meta_data module
27  *
28  *  Description: - add a unique objectGUID onto every new record,
29  *               - handle whenCreated, whenChanged timestamps
30  *               - handle uSNCreated, uSNChanged numbers
31  *               - handle replPropertyMetaData attribute
32  *
33  *  Author: Simo Sorce
34  *  Author: Stefan Metzmacher
35  */
36
37 #include "includes.h"
38 #include "ldb_module.h"
39 #include "dsdb/samdb/samdb.h"
40 #include "dsdb/common/proto.h"
41 #include "../libds/common/flags.h"
42 #include "librpc/gen_ndr/ndr_misc.h"
43 #include "librpc/gen_ndr/ndr_drsuapi.h"
44 #include "librpc/gen_ndr/ndr_drsblobs.h"
45 #include "param/param.h"
46 #include "libcli/security/dom_sid.h"
47 #include "lib/util/dlinklist.h"
48
49 struct replmd_private {
50         TALLOC_CTX *la_ctx;
51         struct la_entry *la_list;
52         uint32_t num_ncs;
53         struct nc_entry {
54                 struct ldb_dn *dn;
55                 struct GUID guid;
56                 uint64_t mod_usn;
57         } *ncs;
58 };
59
60 struct la_entry {
61         struct la_entry *next, *prev;
62         struct drsuapi_DsReplicaLinkedAttribute *la;
63 };
64
65 struct replmd_replicated_request {
66         struct ldb_module *module;
67         struct ldb_request *req;
68
69         const struct dsdb_schema *schema;
70
71         struct dsdb_extended_replicated_objects *objs;
72
73         /* the controls we pass down */
74         struct ldb_control **controls;
75
76         uint32_t index_current;
77
78         struct ldb_message *search_msg;
79 };
80
81
82 /*
83   initialise the module
84   allocate the private structure and build the list
85   of partition DNs for use by replmd_notify()
86  */
87 static int replmd_init(struct ldb_module *module)
88 {
89         struct replmd_private *replmd_private;
90         struct ldb_context *ldb = ldb_module_get_ctx(module);
91
92         replmd_private = talloc_zero(module, struct replmd_private);
93         if (replmd_private == NULL) {
94                 ldb_oom(ldb);
95                 return LDB_ERR_OPERATIONS_ERROR;
96         }
97         ldb_module_set_private(module, replmd_private);
98
99         return ldb_next_init(module);
100 }
101
102
103 static int nc_compare(struct nc_entry *n1, struct nc_entry *n2)
104 {
105         return ldb_dn_compare(n1->dn, n2->dn);
106 }
107
108 /*
109   build the list of partition DNs for use by replmd_notify()
110  */
111 static int replmd_load_NCs(struct ldb_module *module)
112 {
113         const char *attrs[] = { "namingContexts", NULL };
114         struct ldb_result *res = NULL;
115         int i, ret;
116         TALLOC_CTX *tmp_ctx;
117         struct ldb_context *ldb;
118         struct ldb_message_element *el;
119         struct replmd_private *replmd_private = 
120                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
121
122         if (replmd_private->ncs != NULL) {
123                 return LDB_SUCCESS;
124         }
125
126         ldb = ldb_module_get_ctx(module);
127         tmp_ctx = talloc_new(module);
128
129         /* load the list of naming contexts */
130         ret = ldb_search(ldb, tmp_ctx, &res, ldb_dn_new(tmp_ctx, ldb, ""), 
131                          LDB_SCOPE_BASE, attrs, NULL);
132         if (ret != LDB_SUCCESS ||
133             res->count != 1) {
134                 DEBUG(0,(__location__ ": Failed to load rootDSE\n"));
135                 return LDB_ERR_OPERATIONS_ERROR;
136         }
137
138         el = ldb_msg_find_element(res->msgs[0], "namingContexts");
139         if (el == NULL) {
140                 DEBUG(0,(__location__ ": Failed to load namingContexts\n"));
141                 return LDB_ERR_OPERATIONS_ERROR;                
142         }
143
144         replmd_private->num_ncs = el->num_values;
145         replmd_private->ncs = talloc_array(replmd_private, struct nc_entry, 
146                                            replmd_private->num_ncs);
147         if (replmd_private->ncs == NULL) {
148                 ldb_oom(ldb);
149                 return LDB_ERR_OPERATIONS_ERROR;
150         }
151
152         for (i=0; i<replmd_private->num_ncs; i++) {
153                 replmd_private->ncs[i].dn = 
154                         ldb_dn_from_ldb_val(replmd_private->ncs, 
155                                             ldb, &el->values[i]);
156                 replmd_private->ncs[i].mod_usn = 0;
157         }
158
159         talloc_free(res);
160
161         /* now find the GUIDs of each of those DNs */
162         for (i=0; i<replmd_private->num_ncs; i++) {
163                 const char *attrs2[] = { "objectGUID", NULL };
164                 ret = ldb_search(ldb, tmp_ctx, &res, replmd_private->ncs[i].dn,
165                                  LDB_SCOPE_BASE, attrs2, NULL);
166                 if (ret != LDB_SUCCESS ||
167                     res->count != 1) {
168                         /* this happens when the schema is first being
169                            setup */
170                         talloc_free(replmd_private->ncs);
171                         replmd_private->ncs = NULL;
172                         replmd_private->num_ncs = 0;
173                         talloc_free(tmp_ctx);
174                         return LDB_SUCCESS;
175                 }
176                 replmd_private->ncs[i].guid = 
177                         samdb_result_guid(res->msgs[0], "objectGUID");
178                 talloc_free(res);
179         }       
180
181         /* sort the NCs into order, most to least specific */
182         qsort(replmd_private->ncs, replmd_private->num_ncs,
183               sizeof(replmd_private->ncs[0]), QSORT_CAST nc_compare);
184
185         
186         talloc_free(tmp_ctx);
187         
188         return LDB_SUCCESS;
189 }
190
191
192 /*
193  * notify the repl task that a object has changed. The notifies are
194  * gathered up in the replmd_private structure then written to the
195  * @REPLCHANGED object in each partition during the prepare_commit
196  */
197 static int replmd_notify(struct ldb_module *module, struct ldb_dn *dn, uint64_t uSN)
198 {
199         int ret, i;
200         struct replmd_private *replmd_private = 
201                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
202
203         ret = replmd_load_NCs(module);
204         if (ret != LDB_SUCCESS) {
205                 return ret;
206         }
207         if (replmd_private->num_ncs == 0) {
208                 return LDB_SUCCESS;
209         }
210
211         for (i=0; i<replmd_private->num_ncs; i++) {
212                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, dn) == 0) {
213                         break;
214                 }
215         }
216         if (i == replmd_private->num_ncs) {
217                 DEBUG(0,(__location__ ": DN not within known NCs '%s'\n", 
218                          ldb_dn_get_linearized(dn)));
219                 return LDB_ERR_OPERATIONS_ERROR;
220         }
221
222         if (uSN > replmd_private->ncs[i].mod_usn) {
223             replmd_private->ncs[i].mod_usn = uSN;
224         }
225
226         return LDB_SUCCESS;
227 }
228
229
230 /*
231  * update a @REPLCHANGED record in each partition if there have been
232  * any writes of replicated data in the partition
233  */
234 static int replmd_notify_store(struct ldb_module *module)
235 {
236         int i;
237         struct replmd_private *replmd_private = 
238                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
239         struct ldb_context *ldb = ldb_module_get_ctx(module);
240
241         for (i=0; i<replmd_private->num_ncs; i++) {
242                 int ret;
243
244                 if (replmd_private->ncs[i].mod_usn == 0) {
245                         /* this partition has not changed in this
246                            transaction */
247                         continue;
248                 }
249
250                 ret = dsdb_save_partition_usn(ldb, replmd_private->ncs[i].dn, 
251                                               replmd_private->ncs[i].mod_usn);
252                 if (ret != LDB_SUCCESS) {
253                         DEBUG(0,(__location__ ": Failed to save partition uSN for %s\n",
254                                  ldb_dn_get_linearized(replmd_private->ncs[i].dn)));
255                         return ret;
256                 }
257         }
258
259         return LDB_SUCCESS;
260 }
261
262
263 /*
264   created a replmd_replicated_request context
265  */
266 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
267                                                          struct ldb_request *req)
268 {
269         struct ldb_context *ldb;
270         struct replmd_replicated_request *ac;
271
272         ldb = ldb_module_get_ctx(module);
273
274         ac = talloc_zero(req, struct replmd_replicated_request);
275         if (ac == NULL) {
276                 ldb_oom(ldb);
277                 return NULL;
278         }
279
280         ac->module = module;
281         ac->req = req;
282         return ac;
283 }
284
285 /*
286   add a time element to a record
287 */
288 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
289 {
290         struct ldb_message_element *el;
291         char *s;
292
293         if (ldb_msg_find_element(msg, attr) != NULL) {
294                 return LDB_SUCCESS;
295         }
296
297         s = ldb_timestring(msg, t);
298         if (s == NULL) {
299                 return LDB_ERR_OPERATIONS_ERROR;
300         }
301
302         if (ldb_msg_add_string(msg, attr, s) != LDB_SUCCESS) {
303                 return LDB_ERR_OPERATIONS_ERROR;
304         }
305
306         el = ldb_msg_find_element(msg, attr);
307         /* always set as replace. This works because on add ops, the flag
308            is ignored */
309         el->flags = LDB_FLAG_MOD_REPLACE;
310
311         return LDB_SUCCESS;
312 }
313
314 /*
315   add a uint64_t element to a record
316 */
317 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
318 {
319         struct ldb_message_element *el;
320
321         if (ldb_msg_find_element(msg, attr) != NULL) {
322                 return LDB_SUCCESS;
323         }
324
325         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != LDB_SUCCESS) {
326                 return LDB_ERR_OPERATIONS_ERROR;
327         }
328
329         el = ldb_msg_find_element(msg, attr);
330         /* always set as replace. This works because on add ops, the flag
331            is ignored */
332         el->flags = LDB_FLAG_MOD_REPLACE;
333
334         return LDB_SUCCESS;
335 }
336
337 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
338                                                    const struct replPropertyMetaData1 *m2,
339                                                    const uint32_t *rdn_attid)
340 {
341         if (m1->attid == m2->attid) {
342                 return 0;
343         }
344
345         /*
346          * the rdn attribute should be at the end!
347          * so we need to return a value greater than zero
348          * which means m1 is greater than m2
349          */
350         if (m1->attid == *rdn_attid) {
351                 return 1;
352         }
353
354         /*
355          * the rdn attribute should be at the end!
356          * so we need to return a value less than zero
357          * which means m2 is greater than m1
358          */
359         if (m2->attid == *rdn_attid) {
360                 return -1;
361         }
362
363         return m1->attid - m2->attid;
364 }
365
366 static void replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
367                                                  const uint32_t *rdn_attid)
368 {
369         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
370                   discard_const_p(void, rdn_attid), (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
371 }
372
373 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
374                                                  const struct ldb_message_element *e2,
375                                                  const struct dsdb_schema *schema)
376 {
377         const struct dsdb_attribute *a1;
378         const struct dsdb_attribute *a2;
379
380         /* 
381          * TODO: make this faster by caching the dsdb_attribute pointer
382          *       on the ldb_messag_element
383          */
384
385         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
386         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
387
388         /*
389          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
390          *       in the schema
391          */
392         if (!a1 || !a2) {
393                 return strcasecmp(e1->name, e2->name);
394         }
395
396         return a1->attributeID_id - a2->attributeID_id;
397 }
398
399 static void replmd_ldb_message_sort(struct ldb_message *msg,
400                                     const struct dsdb_schema *schema)
401 {
402         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
403                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
404 }
405
406 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
407 {
408         struct ldb_context *ldb;
409         struct replmd_replicated_request *ac;
410
411         ac = talloc_get_type(req->context, struct replmd_replicated_request);
412         ldb = ldb_module_get_ctx(ac->module);
413
414         if (!ares) {
415                 return ldb_module_done(ac->req, NULL, NULL,
416                                         LDB_ERR_OPERATIONS_ERROR);
417         }
418         if (ares->error != LDB_SUCCESS) {
419                 return ldb_module_done(ac->req, ares->controls,
420                                         ares->response, ares->error);
421         }
422
423         if (ares->type != LDB_REPLY_DONE) {
424                 ldb_set_errstring(ldb,
425                                   "invalid ldb_reply_type in callback");
426                 talloc_free(ares);
427                 return ldb_module_done(ac->req, NULL, NULL,
428                                         LDB_ERR_OPERATIONS_ERROR);
429         }
430
431         return ldb_module_done(ac->req, ares->controls,
432                                 ares->response, LDB_SUCCESS);
433 }
434
435 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
436 {
437         struct ldb_context *ldb;
438         struct replmd_replicated_request *ac;
439         const struct dsdb_schema *schema;
440         enum ndr_err_code ndr_err;
441         struct ldb_request *down_req;
442         struct ldb_message *msg;
443         const struct dsdb_attribute *rdn_attr = NULL;
444         struct GUID guid;
445         struct ldb_val guid_value;
446         struct replPropertyMetaDataBlob nmd;
447         struct ldb_val nmd_value;
448         uint64_t seq_num;
449         const struct GUID *our_invocation_id;
450         time_t t = time(NULL);
451         NTTIME now;
452         char *time_str;
453         int ret;
454         uint32_t i, ni=0;
455
456         /* do not manipulate our control entries */
457         if (ldb_dn_is_special(req->op.add.message->dn)) {
458                 return ldb_next_request(module, req);
459         }
460
461         ldb = ldb_module_get_ctx(module);
462
463         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
464
465         schema = dsdb_get_schema(ldb);
466         if (!schema) {
467                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
468                               "replmd_add: no dsdb_schema loaded");
469                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
470                 return LDB_ERR_CONSTRAINT_VIOLATION;
471         }
472
473         ac = replmd_ctx_init(module, req);
474         if (!ac) {
475                 return LDB_ERR_OPERATIONS_ERROR;
476         }
477
478         ac->schema = schema;
479
480         if (ldb_msg_find_element(req->op.add.message, "objectGUID") != NULL) {
481                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
482                               "replmd_add: it's not allowed to add an object with objectGUID\n");
483                 return LDB_ERR_UNWILLING_TO_PERFORM;
484         }
485
486         /* Get a sequence number from the backend */
487         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
488         if (ret != LDB_SUCCESS) {
489                 return ret;
490         }
491
492         /* a new GUID */
493         guid = GUID_random();
494
495         /* get our invocationId */
496         our_invocation_id = samdb_ntds_invocation_id(ldb);
497         if (!our_invocation_id) {
498                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
499                               "replmd_add: unable to find invocationId\n");
500                 return LDB_ERR_OPERATIONS_ERROR;
501         }
502
503         /* we have to copy the message as the caller might have it as a const */
504         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
505         if (msg == NULL) {
506                 ldb_oom(ldb);
507                 return LDB_ERR_OPERATIONS_ERROR;
508         }
509
510         /* generated times */
511         unix_to_nt_time(&now, t);
512         time_str = ldb_timestring(msg, t);
513         if (!time_str) {
514                 return LDB_ERR_OPERATIONS_ERROR;
515         }
516
517         /* 
518          * remove autogenerated attributes
519          */
520         ldb_msg_remove_attr(msg, "whenCreated");
521         ldb_msg_remove_attr(msg, "whenChanged");
522         ldb_msg_remove_attr(msg, "uSNCreated");
523         ldb_msg_remove_attr(msg, "uSNChanged");
524         ldb_msg_remove_attr(msg, "replPropertyMetaData");
525
526         if (!ldb_msg_find_element(req->op.add.message, "instanceType")) {
527                 ret = ldb_msg_add_fmt(msg, "instanceType", "%u", INSTANCE_TYPE_WRITE);
528                 if (ret != LDB_SUCCESS) {
529                         ldb_oom(ldb);
530                         return LDB_ERR_OPERATIONS_ERROR;
531                 }
532         }
533
534         /*
535          * readd replicated attributes
536          */
537         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
538         if (ret != LDB_SUCCESS) {
539                 ldb_oom(ldb);
540                 return LDB_ERR_OPERATIONS_ERROR;
541         }
542
543         /* build the replication meta_data */
544         ZERO_STRUCT(nmd);
545         nmd.version             = 1;
546         nmd.ctr.ctr1.count      = msg->num_elements;
547         nmd.ctr.ctr1.array      = talloc_array(msg,
548                                                struct replPropertyMetaData1,
549                                                nmd.ctr.ctr1.count);
550         if (!nmd.ctr.ctr1.array) {
551                 ldb_oom(ldb);
552                 return LDB_ERR_OPERATIONS_ERROR;
553         }
554
555         for (i=0; i < msg->num_elements; i++) {
556                 struct ldb_message_element *e = &msg->elements[i];
557                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
558                 const struct dsdb_attribute *sa;
559
560                 if (e->name[0] == '@') continue;
561
562                 sa = dsdb_attribute_by_lDAPDisplayName(schema, e->name);
563                 if (!sa) {
564                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
565                                       "replmd_add: attribute '%s' not defined in schema\n",
566                                       e->name);
567                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
568                 }
569
570                 if ((sa->systemFlags & 0x00000001) || (sa->systemFlags & 0x00000004)) {
571                         /* if the attribute is not replicated (0x00000001)
572                          * or constructed (0x00000004) it has no metadata
573                          */
574                         continue;
575                 }
576
577                 m->attid                        = sa->attributeID_id;
578                 m->version                      = 1;
579                 m->originating_change_time      = now;
580                 m->originating_invocation_id    = *our_invocation_id;
581                 m->originating_usn              = seq_num;
582                 m->local_usn                    = seq_num;
583                 ni++;
584
585                 if (ldb_attr_cmp(e->name, ldb_dn_get_rdn_name(msg->dn))) {
586                         rdn_attr = sa;
587                 }
588         }
589
590         /* fix meta data count */
591         nmd.ctr.ctr1.count = ni;
592
593         /*
594          * sort meta data array, and move the rdn attribute entry to the end
595          */
596         replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_attr->attributeID_id);
597
598         /* generated NDR encoded values */
599         ndr_err = ndr_push_struct_blob(&guid_value, msg, 
600                                        NULL,
601                                        &guid,
602                                        (ndr_push_flags_fn_t)ndr_push_GUID);
603         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
604                 ldb_oom(ldb);
605                 return LDB_ERR_OPERATIONS_ERROR;
606         }
607         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
608                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
609                                        &nmd,
610                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
611         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
612                 ldb_oom(ldb);
613                 return LDB_ERR_OPERATIONS_ERROR;
614         }
615
616         /*
617          * add the autogenerated values
618          */
619         ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
620         if (ret != LDB_SUCCESS) {
621                 ldb_oom(ldb);
622                 return LDB_ERR_OPERATIONS_ERROR;
623         }
624         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
625         if (ret != LDB_SUCCESS) {
626                 ldb_oom(ldb);
627                 return LDB_ERR_OPERATIONS_ERROR;
628         }
629         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
630         if (ret != LDB_SUCCESS) {
631                 ldb_oom(ldb);
632                 return LDB_ERR_OPERATIONS_ERROR;
633         }
634         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
635         if (ret != LDB_SUCCESS) {
636                 ldb_oom(ldb);
637                 return LDB_ERR_OPERATIONS_ERROR;
638         }
639         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
640         if (ret != LDB_SUCCESS) {
641                 ldb_oom(ldb);
642                 return LDB_ERR_OPERATIONS_ERROR;
643         }
644
645         /*
646          * sort the attributes by attid before storing the object
647          */
648         replmd_ldb_message_sort(msg, schema);
649
650         ret = ldb_build_add_req(&down_req, ldb, ac,
651                                 msg,
652                                 req->controls,
653                                 ac, replmd_op_callback,
654                                 req);
655         if (ret != LDB_SUCCESS) {
656                 return ret;
657         }
658
659         ret = replmd_notify(module, msg->dn, seq_num);
660         if (ret != LDB_SUCCESS) {
661                 return ret;
662         }
663
664         /* go on with the call chain */
665         return ldb_next_request(module, down_req);
666 }
667
668
669 /*
670  * update the replPropertyMetaData for one element
671  */
672 static int replmd_update_rpmd_element(struct ldb_context *ldb, 
673                                       struct ldb_message *msg,
674                                       struct ldb_message_element *el,
675                                       struct replPropertyMetaDataBlob *omd,
676                                       struct dsdb_schema *schema,
677                                       uint64_t *seq_num,
678                                       const struct GUID *our_invocation_id,
679                                       NTTIME now)
680 {
681         int i;
682         const struct dsdb_attribute *a;
683         struct replPropertyMetaData1 *md1;
684
685         a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
686         if (a == NULL) {
687                 DEBUG(0,(__location__ ": Unable to find attribute %s in schema\n",
688                          el->name));
689                 return LDB_ERR_OPERATIONS_ERROR;
690         }
691
692         if ((a->systemFlags & 0x00000001) || (a->systemFlags & 0x00000004)) {
693                 /* if the attribute is not replicated (0x00000001)
694                  * or constructed (0x00000004) it has no metadata
695                  */
696                 return LDB_SUCCESS;
697         }
698
699         for (i=0; i<omd->ctr.ctr1.count; i++) {
700                 if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) break;
701         }
702         if (i == omd->ctr.ctr1.count) {
703                 /* we need to add a new one */
704                 omd->ctr.ctr1.array = talloc_realloc(msg, omd->ctr.ctr1.array, 
705                                                      struct replPropertyMetaData1, omd->ctr.ctr1.count+1);
706                 if (omd->ctr.ctr1.array == NULL) {
707                         ldb_oom(ldb);
708                         return LDB_ERR_OPERATIONS_ERROR;
709                 }
710                 omd->ctr.ctr1.count++;
711                 ZERO_STRUCT(omd->ctr.ctr1.array[i]);
712         }
713
714         /* Get a new sequence number from the backend. We only do this
715          * if we have a change that requires a new
716          * replPropertyMetaData element 
717          */
718         if (*seq_num == 0) {
719                 int ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num);
720                 if (ret != LDB_SUCCESS) {
721                         return LDB_ERR_OPERATIONS_ERROR;
722                 }
723         }
724
725         md1 = &omd->ctr.ctr1.array[i];
726         md1->version++;
727         md1->attid                     = a->attributeID_id;
728         md1->originating_change_time   = now;
729         md1->originating_invocation_id = *our_invocation_id;
730         md1->originating_usn           = *seq_num;
731         md1->local_usn                 = *seq_num;
732         
733         return LDB_SUCCESS;
734 }
735
736 /*
737  * update the replPropertyMetaData object each time we modify an
738  * object. This is needed for DRS replication, as the merge on the
739  * client is based on this object 
740  */
741 static int replmd_update_rpmd(struct ldb_module *module, 
742                               struct ldb_message *msg, uint64_t *seq_num)
743 {
744         const struct ldb_val *omd_value;
745         enum ndr_err_code ndr_err;
746         struct replPropertyMetaDataBlob omd;
747         int i;
748         struct dsdb_schema *schema;
749         time_t t = time(NULL);
750         NTTIME now;
751         const struct GUID *our_invocation_id;
752         int ret;
753         const char *attrs[] = { "replPropertyMetaData" , NULL };
754         struct ldb_result *res;
755         struct ldb_context *ldb;
756
757         ldb = ldb_module_get_ctx(module);
758
759         our_invocation_id = samdb_ntds_invocation_id(ldb);
760         if (!our_invocation_id) {
761                 /* this happens during an initial vampire while
762                    updating the schema */
763                 DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
764                 return LDB_SUCCESS;
765         }
766
767         unix_to_nt_time(&now, t);
768
769         /* search for the existing replPropertyMetaDataBlob */
770         ret = ldb_search(ldb, msg, &res, msg->dn, LDB_SCOPE_BASE, attrs, NULL);
771         if (ret != LDB_SUCCESS || res->count < 1) {
772                 DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
773                          ldb_dn_get_linearized(msg->dn)));
774                 return LDB_ERR_OPERATIONS_ERROR;
775         }
776                 
777
778         omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
779         if (!omd_value) {
780                 DEBUG(0,(__location__ ": Object %s does not have a replPropertyMetaData attribute\n",
781                          ldb_dn_get_linearized(msg->dn)));
782                 return LDB_ERR_OPERATIONS_ERROR;
783         }
784
785         ndr_err = ndr_pull_struct_blob(omd_value, msg,
786                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
787                                        (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
788         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
789                 DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
790                          ldb_dn_get_linearized(msg->dn)));
791                 return LDB_ERR_OPERATIONS_ERROR;
792         }
793
794         if (omd.version != 1) {
795                 DEBUG(0,(__location__ ": bad version %u in replPropertyMetaData for %s\n",
796                          omd.version, ldb_dn_get_linearized(msg->dn)));
797                 return LDB_ERR_OPERATIONS_ERROR;
798         }
799
800         schema = dsdb_get_schema(ldb);
801
802         for (i=0; i<msg->num_elements; i++) {
803                 ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], &omd, schema, seq_num, 
804                                                  our_invocation_id, now);
805                 if (ret != LDB_SUCCESS) {
806                         return ret;
807                 }
808         }
809
810         /*
811          * replmd_update_rpmd_element has done an update if the
812          * seq_num is set
813          */
814         if (*seq_num != 0) {
815                 struct ldb_val *md_value;
816                 struct ldb_message_element *el;
817                 const char *rdn_name;
818                 const struct dsdb_attribute *rdn_sa;
819
820                 rdn_name = ldb_dn_get_rdn_name(msg->dn);
821                 if (!rdn_name) {
822                         DEBUG(0,(__location__ ": No rDN for %s?\n", ldb_dn_get_linearized(msg->dn)));
823                         return LDB_ERR_OPERATIONS_ERROR;
824                 }
825                 rdn_sa = dsdb_attribute_by_lDAPDisplayName(schema, rdn_name);
826                 if (rdn_sa == NULL) {
827                         DEBUG(0,(__location__ ": sa not found for rDN %s in %s?\n", 
828                                  rdn_name, ldb_dn_get_linearized(msg->dn)));
829                         return LDB_ERR_OPERATIONS_ERROR;
830                 }
831
832                 md_value = talloc(msg, struct ldb_val);
833                 if (md_value == NULL) {
834                         ldb_oom(ldb);
835                         return LDB_ERR_OPERATIONS_ERROR;
836                 }
837
838                 replmd_replPropertyMetaDataCtr1_sort(&omd.ctr.ctr1, &rdn_sa->attributeID_id);
839
840                 ndr_err = ndr_push_struct_blob(md_value, msg, 
841                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
842                                                &omd,
843                                                (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
844                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
845                         DEBUG(0,(__location__ ": Failed to marshall replPropertyMetaData for %s\n",
846                                  ldb_dn_get_linearized(msg->dn)));
847                         return LDB_ERR_OPERATIONS_ERROR;
848                 }
849
850                 ret = ldb_msg_add_empty(msg, "replPropertyMetaData", LDB_FLAG_MOD_REPLACE, &el);
851                 if (ret != LDB_SUCCESS) {
852                         DEBUG(0,(__location__ ": Failed to add updated replPropertyMetaData %s\n",
853                                  ldb_dn_get_linearized(msg->dn)));
854                         return ret;
855                 }
856
857                 ret = replmd_notify(module, msg->dn, *seq_num);
858                 if (ret != LDB_SUCCESS) {
859                         return ret;
860                 }
861
862                 el->num_values = 1;
863                 el->values = md_value;
864         }
865
866         return LDB_SUCCESS;     
867 }
868
869
870 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
871 {
872         struct ldb_context *ldb;
873         struct replmd_replicated_request *ac;
874         const struct dsdb_schema *schema;
875         struct ldb_request *down_req;
876         struct ldb_message *msg;
877         int ret;
878         time_t t = time(NULL);
879         uint64_t seq_num = 0;
880
881         /* do not manipulate our control entries */
882         if (ldb_dn_is_special(req->op.mod.message->dn)) {
883                 return ldb_next_request(module, req);
884         }
885
886         ldb = ldb_module_get_ctx(module);
887
888         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
889
890         schema = dsdb_get_schema(ldb);
891         if (!schema) {
892                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
893                               "replmd_modify: no dsdb_schema loaded");
894                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
895                 return LDB_ERR_CONSTRAINT_VIOLATION;
896         }
897
898         ac = replmd_ctx_init(module, req);
899         if (!ac) {
900                 return LDB_ERR_OPERATIONS_ERROR;
901         }
902
903         ac->schema = schema;
904
905         /* we have to copy the message as the caller might have it as a const */
906         msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
907         if (msg == NULL) {
908                 talloc_free(ac);
909                 return LDB_ERR_OPERATIONS_ERROR;
910         }
911
912         /* TODO:
913          * - get the whole old object
914          * - if the old object doesn't exist report an error
915          * - give an error when a readonly attribute should
916          *   be modified
917          * - merge the changed into the old object
918          *   if the caller set values to the same value
919          *   ignore the attribute, return success when no
920          *   attribute was changed
921          */
922
923         ret = replmd_update_rpmd(module, msg, &seq_num);
924         if (ret != LDB_SUCCESS) {
925                 return ret;
926         }
927
928         /* TODO:
929          * - replace the old object with the newly constructed one
930          */
931
932         ret = ldb_build_mod_req(&down_req, ldb, ac,
933                                 msg,
934                                 req->controls,
935                                 ac, replmd_op_callback,
936                                 req);
937         if (ret != LDB_SUCCESS) {
938                 return ret;
939         }
940         talloc_steal(down_req, msg);
941
942         /* we only change whenChanged and uSNChanged if the seq_num
943            has changed */
944         if (seq_num != 0) {
945                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
946                         talloc_free(ac);
947                         return LDB_ERR_OPERATIONS_ERROR;
948                 }
949
950                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
951                         talloc_free(ac);
952                         return LDB_ERR_OPERATIONS_ERROR;
953                 }
954         }
955
956         /* go on with the call chain */
957         return ldb_next_request(module, down_req);
958 }
959
960
961 /*
962   handle a rename request
963
964   On a rename we need to do an extra ldb_modify which sets the
965   whenChanged and uSNChanged attributes
966  */
967 static int replmd_rename(struct ldb_module *module, struct ldb_request *req)
968 {
969         struct ldb_context *ldb;
970         int ret, i;
971         time_t t = time(NULL);
972         uint64_t seq_num = 0;
973         struct ldb_message *msg;
974         struct replmd_private *replmd_private = 
975                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
976
977         /* do not manipulate our control entries */
978         if (ldb_dn_is_special(req->op.mod.message->dn)) {
979                 return ldb_next_request(module, req);
980         }
981
982         ldb = ldb_module_get_ctx(module);
983
984         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_rename\n");
985
986         /* Get a sequence number from the backend */
987         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
988         if (ret != LDB_SUCCESS) {
989                 return ret;
990         }
991
992         msg = ldb_msg_new(req);
993         if (msg == NULL) {
994                 ldb_oom(ldb);
995                 return LDB_ERR_OPERATIONS_ERROR;
996         }
997
998         msg->dn = req->op.rename.olddn;
999
1000         if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
1001                 talloc_free(msg);
1002                 return LDB_ERR_OPERATIONS_ERROR;
1003         }
1004         msg->elements[0].flags = LDB_FLAG_MOD_REPLACE;
1005
1006         if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
1007                 talloc_free(msg);
1008                 return LDB_ERR_OPERATIONS_ERROR;
1009         }
1010         msg->elements[1].flags = LDB_FLAG_MOD_REPLACE;
1011
1012         ret = ldb_modify(ldb, msg);
1013         talloc_free(msg);
1014         if (ret != LDB_SUCCESS) {
1015                 return ret;
1016         }
1017
1018         ret = replmd_load_NCs(module);
1019         if (ret != 0) {
1020                 return ret;
1021         }
1022
1023         /* now update the highest uSNs of the partitions that are
1024            affected. Note that two partitions could be changing */
1025         for (i=0; i<replmd_private->num_ncs; i++) {
1026                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, 
1027                                         req->op.rename.olddn) == 0) {
1028                         break;
1029                 }
1030         }
1031         if (i == replmd_private->num_ncs) {
1032                 DEBUG(0,(__location__ ": rename olddn outside tree? %s\n",
1033                          ldb_dn_get_linearized(req->op.rename.olddn)));
1034                 return LDB_ERR_OPERATIONS_ERROR;
1035         }
1036         replmd_private->ncs[i].mod_usn = seq_num;
1037
1038         for (i=0; i<replmd_private->num_ncs; i++) {
1039                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, 
1040                                         req->op.rename.newdn) == 0) {
1041                         break;
1042                 }
1043         }
1044         if (i == replmd_private->num_ncs) {
1045                 DEBUG(0,(__location__ ": rename newdn outside tree? %s\n",
1046                          ldb_dn_get_linearized(req->op.rename.newdn)));
1047                 return LDB_ERR_OPERATIONS_ERROR;
1048         }
1049         replmd_private->ncs[i].mod_usn = seq_num;
1050         
1051         /* go on with the call chain */
1052         return ldb_next_request(module, req);
1053 }
1054
1055
1056 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
1057 {
1058         return ret;
1059 }
1060
1061 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
1062 {
1063         int ret = LDB_ERR_OTHER;
1064         /* TODO: do some error mapping */
1065         return ret;
1066 }
1067
1068 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
1069
1070 static int replmd_replicated_apply_add_callback(struct ldb_request *req,
1071                                                 struct ldb_reply *ares)
1072 {
1073         struct ldb_context *ldb;
1074         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1075                                                struct replmd_replicated_request);
1076         int ret;
1077
1078         ldb = ldb_module_get_ctx(ar->module);
1079
1080         if (!ares) {
1081                 return ldb_module_done(ar->req, NULL, NULL,
1082                                         LDB_ERR_OPERATIONS_ERROR);
1083         }
1084         if (ares->error != LDB_SUCCESS) {
1085                 return ldb_module_done(ar->req, ares->controls,
1086                                         ares->response, ares->error);
1087         }
1088
1089         if (ares->type != LDB_REPLY_DONE) {
1090                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1091                 return ldb_module_done(ar->req, NULL, NULL,
1092                                         LDB_ERR_OPERATIONS_ERROR);
1093         }
1094
1095         talloc_free(ares);
1096         ar->index_current++;
1097
1098         ret = replmd_replicated_apply_next(ar);
1099         if (ret != LDB_SUCCESS) {
1100                 return ldb_module_done(ar->req, NULL, NULL, ret);
1101         }
1102
1103         return LDB_SUCCESS;
1104 }
1105
1106 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
1107 {
1108         struct ldb_context *ldb;
1109         struct ldb_request *change_req;
1110         enum ndr_err_code ndr_err;
1111         struct ldb_message *msg;
1112         struct replPropertyMetaDataBlob *md;
1113         struct ldb_val md_value;
1114         uint32_t i;
1115         uint64_t seq_num;
1116         int ret;
1117
1118         /*
1119          * TODO: check if the parent object exist
1120          */
1121
1122         /*
1123          * TODO: handle the conflict case where an object with the
1124          *       same name exist
1125          */
1126
1127         ldb = ldb_module_get_ctx(ar->module);
1128         msg = ar->objs->objects[ar->index_current].msg;
1129         md = ar->objs->objects[ar->index_current].meta_data;
1130
1131         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1132         if (ret != LDB_SUCCESS) {
1133                 return replmd_replicated_request_error(ar, ret);
1134         }
1135
1136         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
1137         if (ret != LDB_SUCCESS) {
1138                 return replmd_replicated_request_error(ar, ret);
1139         }
1140
1141         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1142         if (ret != LDB_SUCCESS) {
1143                 return replmd_replicated_request_error(ar, ret);
1144         }
1145
1146         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
1147         if (ret != LDB_SUCCESS) {
1148                 return replmd_replicated_request_error(ar, ret);
1149         }
1150
1151         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1152         if (ret != LDB_SUCCESS) {
1153                 return replmd_replicated_request_error(ar, ret);
1154         }
1155
1156         ret = replmd_notify(ar->module, msg->dn, seq_num);
1157         if (ret != LDB_SUCCESS) {
1158                 return replmd_replicated_request_error(ar, ret);
1159         }
1160
1161         /* remove any message elements that have zero values */
1162         for (i=0; i<msg->num_elements; i++) {
1163                 if (msg->elements[i].num_values == 0) {
1164                         DEBUG(4,(__location__ ": Removing attribute %s with num_values==0\n",
1165                                  msg->elements[i].name));
1166                         memmove(&msg->elements[i], 
1167                                 &msg->elements[i+1], 
1168                                 sizeof(msg->elements[i])*(msg->num_elements - (i+1)));
1169                         msg->num_elements--;
1170                         i--;
1171                 }
1172         }
1173         
1174         /*
1175          * the meta data array is already sorted by the caller
1176          */
1177         for (i=0; i < md->ctr.ctr1.count; i++) {
1178                 md->ctr.ctr1.array[i].local_usn = seq_num;
1179         }
1180         ndr_err = ndr_push_struct_blob(&md_value, msg, 
1181                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1182                                        md,
1183                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1184         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1185                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1186                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1187         }
1188         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
1189         if (ret != LDB_SUCCESS) {
1190                 return replmd_replicated_request_error(ar, ret);
1191         }
1192
1193         replmd_ldb_message_sort(msg, ar->schema);
1194
1195         if (DEBUGLVL(4)) {
1196                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
1197                 DEBUG(4, ("DRS replication add message:\n%s\n", s));
1198                 talloc_free(s);
1199         }
1200
1201         ret = ldb_build_add_req(&change_req,
1202                                 ldb,
1203                                 ar,
1204                                 msg,
1205                                 ar->controls,
1206                                 ar,
1207                                 replmd_replicated_apply_add_callback,
1208                                 ar->req);
1209         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1210
1211         return ldb_next_request(ar->module, change_req);
1212 }
1213
1214 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
1215                                                          struct replPropertyMetaData1 *m2)
1216 {
1217         int ret;
1218
1219         if (m1->version != m2->version) {
1220                 return m1->version - m2->version;
1221         }
1222
1223         if (m1->originating_change_time != m2->originating_change_time) {
1224                 return m1->originating_change_time - m2->originating_change_time;
1225         }
1226
1227         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
1228         if (ret != 0) {
1229                 return ret;
1230         }
1231
1232         return m1->originating_usn - m2->originating_usn;
1233 }
1234
1235 static int replmd_replicated_apply_merge_callback(struct ldb_request *req,
1236                                                   struct ldb_reply *ares)
1237 {
1238         struct ldb_context *ldb;
1239         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1240                                                struct replmd_replicated_request);
1241         int ret;
1242
1243         ldb = ldb_module_get_ctx(ar->module);
1244
1245         if (!ares) {
1246                 return ldb_module_done(ar->req, NULL, NULL,
1247                                         LDB_ERR_OPERATIONS_ERROR);
1248         }
1249         if (ares->error != LDB_SUCCESS) {
1250                 return ldb_module_done(ar->req, ares->controls,
1251                                         ares->response, ares->error);
1252         }
1253
1254         if (ares->type != LDB_REPLY_DONE) {
1255                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1256                 return ldb_module_done(ar->req, NULL, NULL,
1257                                         LDB_ERR_OPERATIONS_ERROR);
1258         }
1259
1260         talloc_free(ares);
1261         ar->index_current++;
1262
1263         ret = replmd_replicated_apply_next(ar);
1264         if (ret != LDB_SUCCESS) {
1265                 return ldb_module_done(ar->req, NULL, NULL, ret);
1266         }
1267
1268         return LDB_SUCCESS;
1269 }
1270
1271 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
1272 {
1273         struct ldb_context *ldb;
1274         struct ldb_request *change_req;
1275         enum ndr_err_code ndr_err;
1276         struct ldb_message *msg;
1277         struct replPropertyMetaDataBlob *rmd;
1278         struct replPropertyMetaDataBlob omd;
1279         const struct ldb_val *omd_value;
1280         struct replPropertyMetaDataBlob nmd;
1281         struct ldb_val nmd_value;
1282         uint32_t i,j,ni=0;
1283         uint32_t removed_attrs = 0;
1284         uint64_t seq_num;
1285         int ret;
1286
1287         ldb = ldb_module_get_ctx(ar->module);
1288         msg = ar->objs->objects[ar->index_current].msg;
1289         rmd = ar->objs->objects[ar->index_current].meta_data;
1290         ZERO_STRUCT(omd);
1291         omd.version = 1;
1292
1293         /*
1294          * TODO: check repl data is correct after a rename
1295          */
1296         if (ldb_dn_compare(msg->dn, ar->search_msg->dn) != 0) {
1297                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_request rename %s => %s\n",
1298                           ldb_dn_get_linearized(ar->search_msg->dn),
1299                           ldb_dn_get_linearized(msg->dn));
1300                 if (ldb_rename(ldb, ar->search_msg->dn, msg->dn) != LDB_SUCCESS) {
1301                         ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_replicated_request rename %s => %s failed - %s\n",
1302                                   ldb_dn_get_linearized(ar->search_msg->dn),
1303                                   ldb_dn_get_linearized(msg->dn),
1304                                   ldb_errstring(ldb));
1305                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
1306                 }
1307         }
1308
1309         /* find existing meta data */
1310         omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
1311         if (omd_value) {
1312                 ndr_err = ndr_pull_struct_blob(omd_value, ar,
1313                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
1314                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
1315                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1316                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1317                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1318                 }
1319
1320                 if (omd.version != 1) {
1321                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1322                 }
1323         }
1324
1325         ZERO_STRUCT(nmd);
1326         nmd.version = 1;
1327         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
1328         nmd.ctr.ctr1.array = talloc_array(ar,
1329                                           struct replPropertyMetaData1,
1330                                           nmd.ctr.ctr1.count);
1331         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1332
1333         /* first copy the old meta data */
1334         for (i=0; i < omd.ctr.ctr1.count; i++) {
1335                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
1336                 ni++;
1337         }
1338
1339         /* now merge in the new meta data */
1340         for (i=0; i < rmd->ctr.ctr1.count; i++) {
1341                 bool found = false;
1342
1343                 for (j=0; j < ni; j++) {
1344                         int cmp;
1345
1346                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
1347                                 continue;
1348                         }
1349
1350                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
1351                                                                             &nmd.ctr.ctr1.array[j]);
1352                         if (cmp > 0) {
1353                                 /* replace the entry */
1354                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
1355                                 found = true;
1356                                 break;
1357                         }
1358
1359                         /* we don't want to apply this change so remove the attribute */
1360                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
1361                         removed_attrs++;
1362
1363                         found = true;
1364                         break;
1365                 }
1366
1367                 if (found) continue;
1368
1369                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
1370                 ni++;
1371         }
1372
1373         /*
1374          * finally correct the size of the meta_data array
1375          */
1376         nmd.ctr.ctr1.count = ni;
1377
1378         /*
1379          * the rdn attribute (the alias for the name attribute),
1380          * 'cn' for most objects is the last entry in the meta data array
1381          * we have stored
1382          *
1383          * sort the new meta data array
1384          */
1385         {
1386                 struct replPropertyMetaData1 *rdn_p;
1387                 uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
1388
1389                 rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
1390                 replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_p->attid);
1391         }
1392
1393         /*
1394          * check if some replicated attributes left, otherwise skip the ldb_modify() call
1395          */
1396         if (msg->num_elements == 0) {
1397                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
1398                           ar->index_current);
1399
1400                 ar->index_current++;
1401                 return replmd_replicated_apply_next(ar);
1402         }
1403
1404         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
1405                   ar->index_current, msg->num_elements);
1406
1407         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1408         if (ret != LDB_SUCCESS) {
1409                 return replmd_replicated_request_error(ar, ret);
1410         }
1411
1412         for (i=0; i<ni; i++) {
1413                 nmd.ctr.ctr1.array[i].local_usn = seq_num;
1414         }
1415
1416         /* create the meta data value */
1417         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
1418                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1419                                        &nmd,
1420                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1421         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1422                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1423                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1424         }
1425
1426         /*
1427          * when we know that we'll modify the record, add the whenChanged, uSNChanged
1428          * and replPopertyMetaData attributes
1429          */
1430         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1431         if (ret != LDB_SUCCESS) {
1432                 return replmd_replicated_request_error(ar, ret);
1433         }
1434         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1435         if (ret != LDB_SUCCESS) {
1436                 return replmd_replicated_request_error(ar, ret);
1437         }
1438         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
1439         if (ret != LDB_SUCCESS) {
1440                 return replmd_replicated_request_error(ar, ret);
1441         }
1442
1443         replmd_ldb_message_sort(msg, ar->schema);
1444
1445         /* we want to replace the old values */
1446         for (i=0; i < msg->num_elements; i++) {
1447                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
1448         }
1449
1450         ret = replmd_notify(ar->module, msg->dn, seq_num);
1451         if (ret != LDB_SUCCESS) {
1452                 return replmd_replicated_request_error(ar, ret);
1453         }
1454
1455         if (DEBUGLVL(4)) {
1456                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1457                 DEBUG(4, ("DRS replication modify message:\n%s\n", s));
1458                 talloc_free(s);
1459         }
1460
1461         ret = ldb_build_mod_req(&change_req,
1462                                 ldb,
1463                                 ar,
1464                                 msg,
1465                                 ar->controls,
1466                                 ar,
1467                                 replmd_replicated_apply_merge_callback,
1468                                 ar->req);
1469         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1470
1471         return ldb_next_request(ar->module, change_req);
1472 }
1473
1474 static int replmd_replicated_apply_search_callback(struct ldb_request *req,
1475                                                    struct ldb_reply *ares)
1476 {
1477         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1478                                                struct replmd_replicated_request);
1479         int ret;
1480
1481         if (!ares) {
1482                 return ldb_module_done(ar->req, NULL, NULL,
1483                                         LDB_ERR_OPERATIONS_ERROR);
1484         }
1485         if (ares->error != LDB_SUCCESS &&
1486             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1487                 return ldb_module_done(ar->req, ares->controls,
1488                                         ares->response, ares->error);
1489         }
1490
1491         switch (ares->type) {
1492         case LDB_REPLY_ENTRY:
1493                 ar->search_msg = talloc_steal(ar, ares->message);
1494                 break;
1495
1496         case LDB_REPLY_REFERRAL:
1497                 /* we ignore referrals */
1498                 break;
1499
1500         case LDB_REPLY_DONE:
1501                 if (ar->search_msg != NULL) {
1502                         ret = replmd_replicated_apply_merge(ar);
1503                 } else {
1504                         ret = replmd_replicated_apply_add(ar);
1505                 }
1506                 if (ret != LDB_SUCCESS) {
1507                         return ldb_module_done(ar->req, NULL, NULL, ret);
1508                 }
1509         }
1510
1511         talloc_free(ares);
1512         return LDB_SUCCESS;
1513 }
1514
1515 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
1516
1517 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
1518 {
1519         struct ldb_context *ldb;
1520         int ret;
1521         char *tmp_str;
1522         char *filter;
1523         struct ldb_request *search_req;
1524
1525         if (ar->index_current >= ar->objs->num_objects) {
1526                 /* done with it, go to next stage */
1527                 return replmd_replicated_uptodate_vector(ar);
1528         }
1529
1530         ldb = ldb_module_get_ctx(ar->module);
1531         ar->search_msg = NULL;
1532
1533         tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].guid_value);
1534         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1535
1536         filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
1537         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1538         talloc_free(tmp_str);
1539
1540         ret = ldb_build_search_req(&search_req,
1541                                    ldb,
1542                                    ar,
1543                                    ar->objs->partition_dn,
1544                                    LDB_SCOPE_SUBTREE,
1545                                    filter,
1546                                    NULL,
1547                                    NULL,
1548                                    ar,
1549                                    replmd_replicated_apply_search_callback,
1550                                    ar->req);
1551         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1552
1553         return ldb_next_request(ar->module, search_req);
1554 }
1555
1556 static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
1557                                                       struct ldb_reply *ares)
1558 {
1559         struct ldb_context *ldb;
1560         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1561                                                struct replmd_replicated_request);
1562         ldb = ldb_module_get_ctx(ar->module);
1563
1564         if (!ares) {
1565                 return ldb_module_done(ar->req, NULL, NULL,
1566                                         LDB_ERR_OPERATIONS_ERROR);
1567         }
1568         if (ares->error != LDB_SUCCESS) {
1569                 return ldb_module_done(ar->req, ares->controls,
1570                                         ares->response, ares->error);
1571         }
1572
1573         if (ares->type != LDB_REPLY_DONE) {
1574                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1575                 return ldb_module_done(ar->req, NULL, NULL,
1576                                         LDB_ERR_OPERATIONS_ERROR);
1577         }
1578
1579         talloc_free(ares);
1580
1581         return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
1582 }
1583
1584 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1585 {
1586         struct ldb_context *ldb;
1587         struct ldb_request *change_req;
1588         enum ndr_err_code ndr_err;
1589         struct ldb_message *msg;
1590         struct replUpToDateVectorBlob ouv;
1591         const struct ldb_val *ouv_value;
1592         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1593         struct replUpToDateVectorBlob nuv;
1594         struct ldb_val nuv_value;
1595         struct ldb_message_element *nuv_el = NULL;
1596         const struct GUID *our_invocation_id;
1597         struct ldb_message_element *orf_el = NULL;
1598         struct repsFromToBlob nrf;
1599         struct ldb_val *nrf_value = NULL;
1600         struct ldb_message_element *nrf_el = NULL;
1601         uint32_t i,j,ni=0;
1602         bool found = false;
1603         time_t t = time(NULL);
1604         NTTIME now;
1605         int ret;
1606
1607         ldb = ldb_module_get_ctx(ar->module);
1608         ruv = ar->objs->uptodateness_vector;
1609         ZERO_STRUCT(ouv);
1610         ouv.version = 2;
1611         ZERO_STRUCT(nuv);
1612         nuv.version = 2;
1613
1614         unix_to_nt_time(&now, t);
1615
1616         /*
1617          * first create the new replUpToDateVector
1618          */
1619         ouv_value = ldb_msg_find_ldb_val(ar->search_msg, "replUpToDateVector");
1620         if (ouv_value) {
1621                 ndr_err = ndr_pull_struct_blob(ouv_value, ar,
1622                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &ouv,
1623                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1624                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1625                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1626                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1627                 }
1628
1629                 if (ouv.version != 2) {
1630                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1631                 }
1632         }
1633
1634         /*
1635          * the new uptodateness vector will at least
1636          * contain 1 entry, one for the source_dsa
1637          *
1638          * plus optional values from our old vector and the one from the source_dsa
1639          */
1640         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
1641         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1642         nuv.ctr.ctr2.cursors = talloc_array(ar,
1643                                             struct drsuapi_DsReplicaCursor2,
1644                                             nuv.ctr.ctr2.count);
1645         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1646
1647         /* first copy the old vector */
1648         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1649                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1650                 ni++;
1651         }
1652
1653         /* get our invocation_id if we have one already attached to the ldb */
1654         our_invocation_id = samdb_ntds_invocation_id(ldb);
1655
1656         /* merge in the source_dsa vector is available */
1657         for (i=0; (ruv && i < ruv->count); i++) {
1658                 found = false;
1659
1660                 if (our_invocation_id &&
1661                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1662                                our_invocation_id)) {
1663                         continue;
1664                 }
1665
1666                 for (j=0; j < ni; j++) {
1667                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1668                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1669                                 continue;
1670                         }
1671
1672                         found = true;
1673
1674                         /*
1675                          * we update only the highest_usn and not the latest_sync_success time,
1676                          * because the last success stands for direct replication
1677                          */
1678                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1679                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1680                         }
1681                         break;                  
1682                 }
1683
1684                 if (found) continue;
1685
1686                 /* if it's not there yet, add it */
1687                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1688                 ni++;
1689         }
1690
1691         /*
1692          * merge in the current highwatermark for the source_dsa
1693          */
1694         found = false;
1695         for (j=0; j < ni; j++) {
1696                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1697                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1698                         continue;
1699                 }
1700
1701                 found = true;
1702
1703                 /*
1704                  * here we update the highest_usn and last_sync_success time
1705                  * because we're directly replicating from the source_dsa
1706                  *
1707                  * and use the tmp_highest_usn because this is what we have just applied
1708                  * to our ldb
1709                  */
1710                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1711                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1712                 break;
1713         }
1714         if (!found) {
1715                 /*
1716                  * here we update the highest_usn and last_sync_success time
1717                  * because we're directly replicating from the source_dsa
1718                  *
1719                  * and use the tmp_highest_usn because this is what we have just applied
1720                  * to our ldb
1721                  */
1722                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1723                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1724                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1725                 ni++;
1726         }
1727
1728         /*
1729          * finally correct the size of the cursors array
1730          */
1731         nuv.ctr.ctr2.count = ni;
1732
1733         /*
1734          * sort the cursors
1735          */
1736         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1737               sizeof(struct drsuapi_DsReplicaCursor2),
1738               (comparison_fn_t)drsuapi_DsReplicaCursor2_compare);
1739
1740         /*
1741          * create the change ldb_message
1742          */
1743         msg = ldb_msg_new(ar);
1744         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1745         msg->dn = ar->search_msg->dn;
1746
1747         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
1748                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1749                                        &nuv,
1750                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1751         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1752                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1753                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1754         }
1755         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1756         if (ret != LDB_SUCCESS) {
1757                 return replmd_replicated_request_error(ar, ret);
1758         }
1759         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1760
1761         /*
1762          * now create the new repsFrom value from the given repsFromTo1 structure
1763          */
1764         ZERO_STRUCT(nrf);
1765         nrf.version                                     = 1;
1766         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1767         /* and fix some values... */
1768         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1769         nrf.ctr.ctr1.last_success                       = now;
1770         nrf.ctr.ctr1.last_attempt                       = now;
1771         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1772         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1773
1774         /*
1775          * first see if we already have a repsFrom value for the current source dsa
1776          * if so we'll later replace this value
1777          */
1778         orf_el = ldb_msg_find_element(ar->search_msg, "repsFrom");
1779         if (orf_el) {
1780                 for (i=0; i < orf_el->num_values; i++) {
1781                         struct repsFromToBlob *trf;
1782
1783                         trf = talloc(ar, struct repsFromToBlob);
1784                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1785
1786                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), trf,
1787                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1788                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1789                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1790                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1791                         }
1792
1793                         if (trf->version != 1) {
1794                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1795                         }
1796
1797                         /*
1798                          * we compare the source dsa objectGUID not the invocation_id
1799                          * because we want only one repsFrom value per source dsa
1800                          * and when the invocation_id of the source dsa has changed we don't need 
1801                          * the old repsFrom with the old invocation_id
1802                          */
1803                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1804                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1805                                 talloc_free(trf);
1806                                 continue;
1807                         }
1808
1809                         talloc_free(trf);
1810                         nrf_value = &orf_el->values[i];
1811                         break;
1812                 }
1813
1814                 /*
1815                  * copy over all old values to the new ldb_message
1816                  */
1817                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1818                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1819                 *nrf_el = *orf_el;
1820         }
1821
1822         /*
1823          * if we haven't found an old repsFrom value for the current source dsa
1824          * we'll add a new value
1825          */
1826         if (!nrf_value) {
1827                 struct ldb_val zero_value;
1828                 ZERO_STRUCT(zero_value);
1829                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1830                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1831
1832                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1833         }
1834
1835         /* we now fill the value which is already attached to ldb_message */
1836         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
1837                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1838                                        &nrf,
1839                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1840         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1841                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1842                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1843         }
1844
1845         /* 
1846          * the ldb_message_element for the attribute, has all the old values and the new one
1847          * so we'll replace the whole attribute with all values
1848          */
1849         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1850
1851         if (DEBUGLVL(4)) {
1852                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1853                 DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
1854                 talloc_free(s);
1855         }
1856
1857         /* prepare the ldb_modify() request */
1858         ret = ldb_build_mod_req(&change_req,
1859                                 ldb,
1860                                 ar,
1861                                 msg,
1862                                 ar->controls,
1863                                 ar,
1864                                 replmd_replicated_uptodate_modify_callback,
1865                                 ar->req);
1866         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1867
1868         return ldb_next_request(ar->module, change_req);
1869 }
1870
1871 static int replmd_replicated_uptodate_search_callback(struct ldb_request *req,
1872                                                       struct ldb_reply *ares)
1873 {
1874         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1875                                                struct replmd_replicated_request);
1876         int ret;
1877
1878         if (!ares) {
1879                 return ldb_module_done(ar->req, NULL, NULL,
1880                                         LDB_ERR_OPERATIONS_ERROR);
1881         }
1882         if (ares->error != LDB_SUCCESS &&
1883             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1884                 return ldb_module_done(ar->req, ares->controls,
1885                                         ares->response, ares->error);
1886         }
1887
1888         switch (ares->type) {
1889         case LDB_REPLY_ENTRY:
1890                 ar->search_msg = talloc_steal(ar, ares->message);
1891                 break;
1892
1893         case LDB_REPLY_REFERRAL:
1894                 /* we ignore referrals */
1895                 break;
1896
1897         case LDB_REPLY_DONE:
1898                 if (ar->search_msg == NULL) {
1899                         ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1900                 } else {
1901                         ret = replmd_replicated_uptodate_modify(ar);
1902                 }
1903                 if (ret != LDB_SUCCESS) {
1904                         return ldb_module_done(ar->req, NULL, NULL, ret);
1905                 }
1906         }
1907
1908         talloc_free(ares);
1909         return LDB_SUCCESS;
1910 }
1911
1912
1913 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1914 {
1915         struct ldb_context *ldb;
1916         int ret;
1917         static const char *attrs[] = {
1918                 "replUpToDateVector",
1919                 "repsFrom",
1920                 NULL
1921         };
1922         struct ldb_request *search_req;
1923
1924         ldb = ldb_module_get_ctx(ar->module);
1925         ar->search_msg = NULL;
1926
1927         ret = ldb_build_search_req(&search_req,
1928                                    ldb,
1929                                    ar,
1930                                    ar->objs->partition_dn,
1931                                    LDB_SCOPE_BASE,
1932                                    "(objectClass=*)",
1933                                    attrs,
1934                                    NULL,
1935                                    ar,
1936                                    replmd_replicated_uptodate_search_callback,
1937                                    ar->req);
1938         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1939
1940         return ldb_next_request(ar->module, search_req);
1941 }
1942
1943
1944
1945 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1946 {
1947         struct ldb_context *ldb;
1948         struct dsdb_extended_replicated_objects *objs;
1949         struct replmd_replicated_request *ar;
1950         struct ldb_control **ctrls;
1951         int ret, i;
1952         struct dsdb_control_current_partition *partition_ctrl;
1953         struct replmd_private *replmd_private = 
1954                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1955
1956         ldb = ldb_module_get_ctx(module);
1957
1958         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1959
1960         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1961         if (!objs) {
1962                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1963                 return LDB_ERR_PROTOCOL_ERROR;
1964         }
1965
1966         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1967                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1968                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1969                 return LDB_ERR_PROTOCOL_ERROR;
1970         }
1971
1972         ar = replmd_ctx_init(module, req);
1973         if (!ar)
1974                 return LDB_ERR_OPERATIONS_ERROR;
1975
1976         ar->objs = objs;
1977         ar->schema = dsdb_get_schema(ldb);
1978         if (!ar->schema) {
1979                 ldb_debug_set(ldb, LDB_DEBUG_FATAL, "replmd_ctx_init: no loaded schema found\n");
1980                 talloc_free(ar);
1981                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
1982                 return LDB_ERR_CONSTRAINT_VIOLATION;
1983         }
1984
1985         ctrls = req->controls;
1986
1987         if (req->controls) {
1988                 req->controls = talloc_memdup(ar, req->controls,
1989                                               talloc_get_size(req->controls));
1990                 if (!req->controls) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1991         }
1992
1993         ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, NULL);
1994         if (ret != LDB_SUCCESS) {
1995                 return ret;
1996         }
1997
1998         /*
1999           add the DSDB_CONTROL_CURRENT_PARTITION_OID control. This
2000           tells the partition module which partition this request is
2001           directed at. That is important as the partition roots appear
2002           twice in the directory, once as mount points in the top
2003           level store, and once as the roots of each partition. The
2004           replication code wants to operate on the root of the
2005           partitions, not the top level mount points
2006          */
2007         partition_ctrl = talloc(req, struct dsdb_control_current_partition);
2008         if (partition_ctrl == NULL) {
2009                 if (!partition_ctrl) return replmd_replicated_request_werror(ar, WERR_NOMEM);
2010         }
2011         partition_ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
2012         partition_ctrl->dn = objs->partition_dn;
2013
2014         ret = ldb_request_add_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID, false, partition_ctrl);
2015         if (ret != LDB_SUCCESS) {
2016                 return ret;
2017         }
2018
2019         ar->controls = req->controls;
2020         req->controls = ctrls;
2021
2022         DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
2023
2024         /* save away the linked attributes for the end of the
2025            transaction */
2026         for (i=0; i<ar->objs->linked_attributes_count; i++) {
2027                 struct la_entry *la_entry;
2028
2029                 if (replmd_private->la_ctx == NULL) {
2030                         replmd_private->la_ctx = talloc_new(replmd_private);
2031                 }
2032                 la_entry = talloc(replmd_private->la_ctx, struct la_entry);
2033                 if (la_entry == NULL) {
2034                         ldb_oom(ldb);
2035                         return LDB_ERR_OPERATIONS_ERROR;
2036                 }
2037                 la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
2038                 if (la_entry->la == NULL) {
2039                         talloc_free(la_entry);
2040                         ldb_oom(ldb);
2041                         return LDB_ERR_OPERATIONS_ERROR;
2042                 }
2043                 *la_entry->la = ar->objs->linked_attributes[i];
2044
2045                 /* we need to steal the non-scalars so they stay
2046                    around until the end of the transaction */
2047                 talloc_steal(la_entry->la, la_entry->la->identifier);
2048                 talloc_steal(la_entry->la, la_entry->la->value.blob);
2049
2050                 DLIST_ADD(replmd_private->la_list, la_entry);
2051         }
2052
2053         return replmd_replicated_apply_next(ar);
2054 }
2055
2056 /*
2057   process one linked attribute structure
2058  */
2059 static int replmd_process_linked_attribute(struct ldb_module *module,
2060                                            struct la_entry *la_entry)
2061 {                                          
2062         struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
2063         struct ldb_context *ldb = ldb_module_get_ctx(module);
2064         struct drsuapi_DsReplicaObjectIdentifier3 target;
2065         struct ldb_message *msg;
2066         struct ldb_message_element *ret_el;
2067         TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
2068         enum ndr_err_code ndr_err;
2069         char *target_dn;
2070         struct ldb_request *mod_req;
2071         int ret;
2072         const struct dsdb_attribute *attr;
2073
2074 /*
2075 linked_attributes[0]:                                                     
2076      &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute 
2077         identifier               : *                                      
2078             identifier: struct drsuapi_DsReplicaObjectIdentifier          
2079                 __ndr_size               : 0x0000003a (58)                
2080                 __ndr_size_sid           : 0x00000000 (0)                 
2081                 guid                     : 8e95b6a9-13dd-4158-89db-3220a5be5cc7
2082                 sid                      : S-0-0                               
2083                 __ndr_size_dn            : 0x00000000 (0)                      
2084                 dn                       : ''                                  
2085         attid                    : DRSUAPI_ATTRIBUTE_member (0x1F)             
2086         value: struct drsuapi_DsAttributeValue                                 
2087             __ndr_size               : 0x0000007e (126)                        
2088             blob                     : *                                       
2089                 blob                     : DATA_BLOB length=126                
2090         flags                    : 0x00000001 (1)                              
2091                1: DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE                      
2092         originating_add_time     : Wed Sep  2 22:20:01 2009 EST                
2093         meta_data: struct drsuapi_DsReplicaMetaData                            
2094             version                  : 0x00000015 (21)                         
2095             originating_change_time  : Wed Sep  2 23:39:07 2009 EST            
2096             originating_invocation_id: 794640f3-18cf-40ee-a211-a93992b67a64    
2097             originating_usn          : 0x000000000001e19c (123292)             
2098      &target: struct drsuapi_DsReplicaObjectIdentifier3                        
2099         __ndr_size               : 0x0000007e (126)                            
2100         __ndr_size_sid           : 0x0000001c (28)                             
2101         guid                     : 7639e594-db75-4086-b0d4-67890ae46031        
2102         sid                      : S-1-5-21-2848215498-2472035911-1947525656-19924
2103         __ndr_size_dn            : 0x00000022 (34)                                
2104         dn                       : 'CN=UOne,OU=TestOU,DC=vsofs8,DC=com'           
2105  */
2106         if (DEBUGLVL(4)) {
2107                 NDR_PRINT_DEBUG(drsuapi_DsReplicaLinkedAttribute, la); 
2108         }
2109         
2110         /* decode the target of the link */
2111         ndr_err = ndr_pull_struct_blob(la->value.blob, 
2112                                        tmp_ctx, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
2113                                        &target,
2114                                        (ndr_pull_flags_fn_t)ndr_pull_drsuapi_DsReplicaObjectIdentifier3);
2115         if (ndr_err != NDR_ERR_SUCCESS) {
2116                 DEBUG(0,("Unable to decode linked_attribute target\n"));
2117                 dump_data(4, la->value.blob->data, la->value.blob->length);                     
2118                 talloc_free(tmp_ctx);
2119                 return LDB_ERR_OPERATIONS_ERROR;
2120         }
2121         if (DEBUGLVL(4)) {
2122                 NDR_PRINT_DEBUG(drsuapi_DsReplicaObjectIdentifier3, &target);
2123         }
2124
2125         /* construct a modify request for this attribute change */
2126         msg = ldb_msg_new(tmp_ctx);
2127         if (!msg) {
2128                 ldb_oom(ldb);
2129                 talloc_free(tmp_ctx);
2130                 return LDB_ERR_OPERATIONS_ERROR;
2131         }
2132
2133         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, 
2134                                    GUID_string(tmp_ctx, &la->identifier->guid), &msg->dn);
2135         if (ret != LDB_SUCCESS) {
2136                 talloc_free(tmp_ctx);
2137                 return ret;
2138         }
2139
2140         /* find the attribute being modified */
2141         attr = dsdb_attribute_by_attributeID_id(dsdb_get_schema(ldb), la->attid);
2142         if (attr == NULL) {
2143                 DEBUG(0, (__location__ ": Unable to find attributeID 0x%x\n", la->attid));
2144                 talloc_free(tmp_ctx);
2145                 return LDB_ERR_OPERATIONS_ERROR;
2146         }
2147
2148         if (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) {
2149                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
2150                                         LDB_FLAG_MOD_ADD, &ret_el);
2151         } else {
2152                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
2153                                         LDB_FLAG_MOD_DELETE, &ret_el);
2154         }
2155         if (ret != LDB_SUCCESS) {
2156                 talloc_free(tmp_ctx);
2157                 return ret;
2158         }
2159         /* we allocate two entries here, in case we need a remove/add
2160            pair */
2161         ret_el->values = talloc_array(msg, struct ldb_val, 2);
2162         if (!ret_el->values) {
2163                 ldb_oom(ldb);
2164                 talloc_free(tmp_ctx);
2165                 return LDB_ERR_OPERATIONS_ERROR;
2166         }
2167         ret_el->num_values = 1;
2168
2169         target_dn = talloc_asprintf(tmp_ctx, "<GUID=%s>;<SID=%s>;%s",
2170                                     GUID_string(tmp_ctx, &target.guid),
2171                                     dom_sid_string(tmp_ctx, &target.sid),
2172                                     target.dn);
2173         if (target_dn == NULL) {
2174                 ldb_oom(ldb);
2175                 talloc_free(tmp_ctx);
2176                 return LDB_ERR_OPERATIONS_ERROR;
2177         }
2178         ret_el->values[0] = data_blob_string_const(target_dn);
2179
2180         ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2181                                 msg,
2182                                 NULL,
2183                                 NULL, 
2184                                 ldb_op_default_callback,
2185                                 NULL);
2186         if (ret != LDB_SUCCESS) {
2187                 talloc_free(tmp_ctx);
2188                 return ret;
2189         }
2190         talloc_steal(mod_req, msg);
2191
2192         if (DEBUGLVL(4)) {
2193                 DEBUG(4,("Applying DRS linked attribute change:\n%s\n",
2194                          ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg)));
2195         }
2196
2197         /* Run the new request */
2198         ret = ldb_next_request(module, mod_req);
2199
2200         /* we need to wait for this to finish, as we are being called
2201            from the synchronous end_transaction hook of this module */
2202         if (ret == LDB_SUCCESS) {
2203                 ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
2204         }
2205
2206         if (ret == LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
2207                 /* the link destination exists, we need to update it
2208                  * by deleting the old one for the same DN then adding
2209                  * the new one */
2210                 msg->elements = talloc_realloc(msg, msg->elements,
2211                                                struct ldb_message_element,
2212                                                msg->num_elements+1);
2213                 if (msg->elements == NULL) {
2214                         ldb_oom(ldb);
2215                         talloc_free(tmp_ctx);
2216                         return LDB_ERR_OPERATIONS_ERROR;
2217                 }
2218                 /* this relies on the backend matching the old entry
2219                    only by the DN portion of the extended DN */
2220                 msg->elements[1] = msg->elements[0];
2221                 msg->elements[0].flags = LDB_FLAG_MOD_DELETE;
2222                 msg->num_elements++;
2223
2224                 ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2225                                         msg,
2226                                         NULL,
2227                                         NULL, 
2228                                         ldb_op_default_callback,
2229                                         NULL);
2230                 if (ret != LDB_SUCCESS) {
2231                         talloc_free(tmp_ctx);
2232                         return ret;
2233                 }
2234
2235                 /* Run the new request */
2236                 ret = ldb_next_request(module, mod_req);
2237                 
2238                 if (ret == LDB_SUCCESS) {
2239                         ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
2240                 }
2241         }
2242
2243         if (ret != LDB_SUCCESS) {
2244                 ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s' %s\n",
2245                           ldb_errstring(ldb),
2246                           ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
2247                 ret = LDB_SUCCESS;
2248         }
2249         
2250         talloc_free(tmp_ctx);
2251
2252         return ret;     
2253 }
2254
2255 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
2256 {
2257         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
2258                 return replmd_extended_replicated_objects(module, req);
2259         }
2260
2261         return ldb_next_request(module, req);
2262 }
2263
2264
2265 /*
2266   we hook into the transaction operations to allow us to 
2267   perform the linked attribute updates at the end of the whole
2268   transaction. This allows a forward linked attribute to be created
2269   before the object is created. During a vampire, w2k8 sends us linked
2270   attributes before the objects they are part of.
2271  */
2272 static int replmd_start_transaction(struct ldb_module *module)
2273 {
2274         /* create our private structure for this transaction */
2275         int i;
2276         struct replmd_private *replmd_private = talloc_get_type(ldb_module_get_private(module),
2277                                                                 struct replmd_private);
2278         talloc_free(replmd_private->la_ctx);
2279         replmd_private->la_list = NULL;
2280         replmd_private->la_ctx = NULL;
2281
2282         for (i=0; i<replmd_private->num_ncs; i++) {
2283                 replmd_private->ncs[i].mod_usn = 0;
2284         }
2285
2286         return ldb_next_start_trans(module);
2287 }
2288
2289 /*
2290   on prepare commit we loop over our queued la_context structures and
2291   apply each of them  
2292  */
2293 static int replmd_prepare_commit(struct ldb_module *module)
2294 {
2295         struct replmd_private *replmd_private = 
2296                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2297         struct la_entry *la, *prev;
2298         int ret;
2299
2300         /* walk the list backwards, to do the first entry first, as we
2301          * added the entries with DLIST_ADD() which puts them at the
2302          * start of the list */
2303         for (la = replmd_private->la_list; la && la->next; la=la->next) ;
2304
2305         for (; la; la=prev) {
2306                 prev = la->prev;
2307                 DLIST_REMOVE(replmd_private->la_list, la);
2308                 ret = replmd_process_linked_attribute(module, la);
2309                 if (ret != LDB_SUCCESS) {
2310                         return ret;
2311                 }
2312         }
2313
2314         talloc_free(replmd_private->la_ctx);
2315         replmd_private->la_list = NULL;
2316         replmd_private->la_ctx = NULL;
2317
2318         /* possibly change @REPLCHANGED */
2319         ret = replmd_notify_store(module);
2320         if (ret != LDB_SUCCESS) {
2321                 return ret;
2322         }
2323         
2324         return ldb_next_prepare_commit(module);
2325 }
2326
2327 static int replmd_del_transaction(struct ldb_module *module)
2328 {
2329         struct replmd_private *replmd_private = 
2330                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2331         talloc_free(replmd_private->la_ctx);
2332         replmd_private->la_list = NULL;
2333         replmd_private->la_ctx = NULL;
2334         return ldb_next_del_trans(module);
2335 }
2336
2337
2338 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
2339         .name          = "repl_meta_data",
2340         .init_context      = replmd_init,
2341         .add               = replmd_add,
2342         .modify            = replmd_modify,
2343         .rename            = replmd_rename,
2344         .extended          = replmd_extended,
2345         .start_transaction = replmd_start_transaction,
2346         .prepare_commit    = replmd_prepare_commit,
2347         .del_transaction   = replmd_del_transaction,
2348 };