s4-drs: add defines for replication flags on attributes
[kai/samba.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /*
24  *  Name: ldb
25  *
26  *  Component: ldb repl_meta_data module
27  *
28  *  Description: - add a unique objectGUID onto every new record,
29  *               - handle whenCreated, whenChanged timestamps
30  *               - handle uSNCreated, uSNChanged numbers
31  *               - handle replPropertyMetaData attribute
32  *
33  *  Author: Simo Sorce
34  *  Author: Stefan Metzmacher
35  */
36
37 #include "includes.h"
38 #include "ldb_module.h"
39 #include "dsdb/samdb/samdb.h"
40 #include "dsdb/common/proto.h"
41 #include "../libds/common/flags.h"
42 #include "librpc/gen_ndr/ndr_misc.h"
43 #include "librpc/gen_ndr/ndr_drsuapi.h"
44 #include "librpc/gen_ndr/ndr_drsblobs.h"
45 #include "param/param.h"
46 #include "libcli/security/dom_sid.h"
47 #include "lib/util/dlinklist.h"
48
49 struct replmd_private {
50         TALLOC_CTX *la_ctx;
51         struct la_entry *la_list;
52         uint32_t num_ncs;
53         struct nc_entry {
54                 struct ldb_dn *dn;
55                 struct GUID guid;
56                 uint64_t mod_usn;
57         } *ncs;
58 };
59
60 struct la_entry {
61         struct la_entry *next, *prev;
62         struct drsuapi_DsReplicaLinkedAttribute *la;
63 };
64
65 struct replmd_replicated_request {
66         struct ldb_module *module;
67         struct ldb_request *req;
68
69         const struct dsdb_schema *schema;
70
71         struct dsdb_extended_replicated_objects *objs;
72
73         /* the controls we pass down */
74         struct ldb_control **controls;
75
76         uint32_t index_current;
77
78         struct ldb_message *search_msg;
79 };
80
81
82 /*
83   initialise the module
84   allocate the private structure and build the list
85   of partition DNs for use by replmd_notify()
86  */
87 static int replmd_init(struct ldb_module *module)
88 {
89         struct replmd_private *replmd_private;
90         struct ldb_context *ldb = ldb_module_get_ctx(module);
91
92         replmd_private = talloc_zero(module, struct replmd_private);
93         if (replmd_private == NULL) {
94                 ldb_oom(ldb);
95                 return LDB_ERR_OPERATIONS_ERROR;
96         }
97         ldb_module_set_private(module, replmd_private);
98
99         return ldb_next_init(module);
100 }
101
102
103 static int nc_compare(struct nc_entry *n1, struct nc_entry *n2)
104 {
105         return ldb_dn_compare(n1->dn, n2->dn);
106 }
107
108 /*
109   build the list of partition DNs for use by replmd_notify()
110  */
111 static int replmd_load_NCs(struct ldb_module *module)
112 {
113         const char *attrs[] = { "namingContexts", NULL };
114         struct ldb_result *res = NULL;
115         int i, ret;
116         TALLOC_CTX *tmp_ctx;
117         struct ldb_context *ldb;
118         struct ldb_message_element *el;
119         struct replmd_private *replmd_private = 
120                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
121
122         if (replmd_private->ncs != NULL) {
123                 return LDB_SUCCESS;
124         }
125
126         ldb = ldb_module_get_ctx(module);
127         tmp_ctx = talloc_new(module);
128
129         /* load the list of naming contexts */
130         ret = ldb_search(ldb, tmp_ctx, &res, ldb_dn_new(tmp_ctx, ldb, ""), 
131                          LDB_SCOPE_BASE, attrs, NULL);
132         if (ret != LDB_SUCCESS ||
133             res->count != 1) {
134                 DEBUG(0,(__location__ ": Failed to load rootDSE\n"));
135                 return LDB_ERR_OPERATIONS_ERROR;
136         }
137
138         el = ldb_msg_find_element(res->msgs[0], "namingContexts");
139         if (el == NULL) {
140                 DEBUG(0,(__location__ ": Failed to load namingContexts\n"));
141                 return LDB_ERR_OPERATIONS_ERROR;                
142         }
143
144         replmd_private->num_ncs = el->num_values;
145         replmd_private->ncs = talloc_array(replmd_private, struct nc_entry, 
146                                            replmd_private->num_ncs);
147         if (replmd_private->ncs == NULL) {
148                 ldb_oom(ldb);
149                 return LDB_ERR_OPERATIONS_ERROR;
150         }
151
152         for (i=0; i<replmd_private->num_ncs; i++) {
153                 replmd_private->ncs[i].dn = 
154                         ldb_dn_from_ldb_val(replmd_private->ncs, 
155                                             ldb, &el->values[i]);
156                 replmd_private->ncs[i].mod_usn = 0;
157         }
158
159         talloc_free(res);
160
161         /* now find the GUIDs of each of those DNs */
162         for (i=0; i<replmd_private->num_ncs; i++) {
163                 const char *attrs2[] = { "objectGUID", NULL };
164                 ret = ldb_search(ldb, tmp_ctx, &res, replmd_private->ncs[i].dn,
165                                  LDB_SCOPE_BASE, attrs2, NULL);
166                 if (ret != LDB_SUCCESS ||
167                     res->count != 1) {
168                         /* this happens when the schema is first being
169                            setup */
170                         talloc_free(replmd_private->ncs);
171                         replmd_private->ncs = NULL;
172                         replmd_private->num_ncs = 0;
173                         talloc_free(tmp_ctx);
174                         return LDB_SUCCESS;
175                 }
176                 replmd_private->ncs[i].guid = 
177                         samdb_result_guid(res->msgs[0], "objectGUID");
178                 talloc_free(res);
179         }       
180
181         /* sort the NCs into order, most to least specific */
182         qsort(replmd_private->ncs, replmd_private->num_ncs,
183               sizeof(replmd_private->ncs[0]), QSORT_CAST nc_compare);
184
185         
186         talloc_free(tmp_ctx);
187         
188         return LDB_SUCCESS;
189 }
190
191
192 /*
193  * notify the repl task that a object has changed. The notifies are
194  * gathered up in the replmd_private structure then written to the
195  * @REPLCHANGED object in each partition during the prepare_commit
196  */
197 static int replmd_notify(struct ldb_module *module, struct ldb_dn *dn, uint64_t uSN)
198 {
199         int ret, i;
200         struct replmd_private *replmd_private = 
201                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
202
203         ret = replmd_load_NCs(module);
204         if (ret != LDB_SUCCESS) {
205                 return ret;
206         }
207         if (replmd_private->num_ncs == 0) {
208                 return LDB_SUCCESS;
209         }
210
211         for (i=0; i<replmd_private->num_ncs; i++) {
212                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, dn) == 0) {
213                         break;
214                 }
215         }
216         if (i == replmd_private->num_ncs) {
217                 DEBUG(0,(__location__ ": DN not within known NCs '%s'\n", 
218                          ldb_dn_get_linearized(dn)));
219                 return LDB_ERR_OPERATIONS_ERROR;
220         }
221
222         if (uSN > replmd_private->ncs[i].mod_usn) {
223             replmd_private->ncs[i].mod_usn = uSN;
224         }
225
226         return LDB_SUCCESS;
227 }
228
229
230 /*
231  * update a @REPLCHANGED record in each partition if there have been
232  * any writes of replicated data in the partition
233  */
234 static int replmd_notify_store(struct ldb_module *module)
235 {
236         int i;
237         struct replmd_private *replmd_private = 
238                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
239         struct ldb_context *ldb = ldb_module_get_ctx(module);
240
241         for (i=0; i<replmd_private->num_ncs; i++) {
242                 int ret;
243
244                 if (replmd_private->ncs[i].mod_usn == 0) {
245                         /* this partition has not changed in this
246                            transaction */
247                         continue;
248                 }
249
250                 ret = dsdb_save_partition_usn(ldb, replmd_private->ncs[i].dn, 
251                                               replmd_private->ncs[i].mod_usn);
252                 if (ret != LDB_SUCCESS) {
253                         DEBUG(0,(__location__ ": Failed to save partition uSN for %s\n",
254                                  ldb_dn_get_linearized(replmd_private->ncs[i].dn)));
255                         return ret;
256                 }
257         }
258
259         return LDB_SUCCESS;
260 }
261
262
263 /*
264   created a replmd_replicated_request context
265  */
266 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
267                                                          struct ldb_request *req)
268 {
269         struct ldb_context *ldb;
270         struct replmd_replicated_request *ac;
271
272         ldb = ldb_module_get_ctx(module);
273
274         ac = talloc_zero(req, struct replmd_replicated_request);
275         if (ac == NULL) {
276                 ldb_oom(ldb);
277                 return NULL;
278         }
279
280         ac->module = module;
281         ac->req = req;
282         return ac;
283 }
284
285 /*
286   add a time element to a record
287 */
288 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
289 {
290         struct ldb_message_element *el;
291         char *s;
292
293         if (ldb_msg_find_element(msg, attr) != NULL) {
294                 return LDB_SUCCESS;
295         }
296
297         s = ldb_timestring(msg, t);
298         if (s == NULL) {
299                 return LDB_ERR_OPERATIONS_ERROR;
300         }
301
302         if (ldb_msg_add_string(msg, attr, s) != LDB_SUCCESS) {
303                 return LDB_ERR_OPERATIONS_ERROR;
304         }
305
306         el = ldb_msg_find_element(msg, attr);
307         /* always set as replace. This works because on add ops, the flag
308            is ignored */
309         el->flags = LDB_FLAG_MOD_REPLACE;
310
311         return LDB_SUCCESS;
312 }
313
314 /*
315   add a uint64_t element to a record
316 */
317 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
318 {
319         struct ldb_message_element *el;
320
321         if (ldb_msg_find_element(msg, attr) != NULL) {
322                 return LDB_SUCCESS;
323         }
324
325         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != LDB_SUCCESS) {
326                 return LDB_ERR_OPERATIONS_ERROR;
327         }
328
329         el = ldb_msg_find_element(msg, attr);
330         /* always set as replace. This works because on add ops, the flag
331            is ignored */
332         el->flags = LDB_FLAG_MOD_REPLACE;
333
334         return LDB_SUCCESS;
335 }
336
337 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
338                                                    const struct replPropertyMetaData1 *m2,
339                                                    const uint32_t *rdn_attid)
340 {
341         if (m1->attid == m2->attid) {
342                 return 0;
343         }
344
345         /*
346          * the rdn attribute should be at the end!
347          * so we need to return a value greater than zero
348          * which means m1 is greater than m2
349          */
350         if (m1->attid == *rdn_attid) {
351                 return 1;
352         }
353
354         /*
355          * the rdn attribute should be at the end!
356          * so we need to return a value less than zero
357          * which means m2 is greater than m1
358          */
359         if (m2->attid == *rdn_attid) {
360                 return -1;
361         }
362
363         return m1->attid - m2->attid;
364 }
365
366 static void replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
367                                                  const uint32_t *rdn_attid)
368 {
369         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
370                   discard_const_p(void, rdn_attid), (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
371 }
372
373 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
374                                                  const struct ldb_message_element *e2,
375                                                  const struct dsdb_schema *schema)
376 {
377         const struct dsdb_attribute *a1;
378         const struct dsdb_attribute *a2;
379
380         /* 
381          * TODO: make this faster by caching the dsdb_attribute pointer
382          *       on the ldb_messag_element
383          */
384
385         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
386         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
387
388         /*
389          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
390          *       in the schema
391          */
392         if (!a1 || !a2) {
393                 return strcasecmp(e1->name, e2->name);
394         }
395
396         return a1->attributeID_id - a2->attributeID_id;
397 }
398
399 static void replmd_ldb_message_sort(struct ldb_message *msg,
400                                     const struct dsdb_schema *schema)
401 {
402         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
403                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
404 }
405
406 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
407 {
408         struct ldb_context *ldb;
409         struct replmd_replicated_request *ac;
410
411         ac = talloc_get_type(req->context, struct replmd_replicated_request);
412         ldb = ldb_module_get_ctx(ac->module);
413
414         if (!ares) {
415                 return ldb_module_done(ac->req, NULL, NULL,
416                                         LDB_ERR_OPERATIONS_ERROR);
417         }
418         if (ares->error != LDB_SUCCESS) {
419                 return ldb_module_done(ac->req, ares->controls,
420                                         ares->response, ares->error);
421         }
422
423         if (ares->type != LDB_REPLY_DONE) {
424                 ldb_set_errstring(ldb,
425                                   "invalid ldb_reply_type in callback");
426                 talloc_free(ares);
427                 return ldb_module_done(ac->req, NULL, NULL,
428                                         LDB_ERR_OPERATIONS_ERROR);
429         }
430
431         return ldb_module_done(ac->req, ares->controls,
432                                 ares->response, LDB_SUCCESS);
433 }
434
435 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
436 {
437         struct ldb_context *ldb;
438         struct replmd_replicated_request *ac;
439         const struct dsdb_schema *schema;
440         enum ndr_err_code ndr_err;
441         struct ldb_request *down_req;
442         struct ldb_message *msg;
443         const struct dsdb_attribute *rdn_attr = NULL;
444         struct GUID guid;
445         struct ldb_val guid_value;
446         struct replPropertyMetaDataBlob nmd;
447         struct ldb_val nmd_value;
448         uint64_t seq_num;
449         const struct GUID *our_invocation_id;
450         time_t t = time(NULL);
451         NTTIME now;
452         char *time_str;
453         int ret;
454         uint32_t i, ni=0;
455
456         /* do not manipulate our control entries */
457         if (ldb_dn_is_special(req->op.add.message->dn)) {
458                 return ldb_next_request(module, req);
459         }
460
461         ldb = ldb_module_get_ctx(module);
462
463         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
464
465         schema = dsdb_get_schema(ldb);
466         if (!schema) {
467                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
468                               "replmd_add: no dsdb_schema loaded");
469                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
470                 return LDB_ERR_CONSTRAINT_VIOLATION;
471         }
472
473         ac = replmd_ctx_init(module, req);
474         if (!ac) {
475                 return LDB_ERR_OPERATIONS_ERROR;
476         }
477
478         ac->schema = schema;
479
480         if (ldb_msg_find_element(req->op.add.message, "objectGUID") != NULL) {
481                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
482                               "replmd_add: it's not allowed to add an object with objectGUID\n");
483                 return LDB_ERR_UNWILLING_TO_PERFORM;
484         }
485
486         /* Get a sequence number from the backend */
487         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
488         if (ret != LDB_SUCCESS) {
489                 return ret;
490         }
491
492         /* a new GUID */
493         guid = GUID_random();
494
495         /* get our invocationId */
496         our_invocation_id = samdb_ntds_invocation_id(ldb);
497         if (!our_invocation_id) {
498                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
499                               "replmd_add: unable to find invocationId\n");
500                 return LDB_ERR_OPERATIONS_ERROR;
501         }
502
503         /* we have to copy the message as the caller might have it as a const */
504         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
505         if (msg == NULL) {
506                 ldb_oom(ldb);
507                 return LDB_ERR_OPERATIONS_ERROR;
508         }
509
510         /* generated times */
511         unix_to_nt_time(&now, t);
512         time_str = ldb_timestring(msg, t);
513         if (!time_str) {
514                 return LDB_ERR_OPERATIONS_ERROR;
515         }
516
517         /* 
518          * remove autogenerated attributes
519          */
520         ldb_msg_remove_attr(msg, "whenCreated");
521         ldb_msg_remove_attr(msg, "whenChanged");
522         ldb_msg_remove_attr(msg, "uSNCreated");
523         ldb_msg_remove_attr(msg, "uSNChanged");
524         ldb_msg_remove_attr(msg, "replPropertyMetaData");
525
526         if (!ldb_msg_find_element(req->op.add.message, "instanceType")) {
527                 ret = ldb_msg_add_fmt(msg, "instanceType", "%u", INSTANCE_TYPE_WRITE);
528                 if (ret != LDB_SUCCESS) {
529                         ldb_oom(ldb);
530                         return LDB_ERR_OPERATIONS_ERROR;
531                 }
532         }
533
534         /*
535          * readd replicated attributes
536          */
537         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
538         if (ret != LDB_SUCCESS) {
539                 ldb_oom(ldb);
540                 return LDB_ERR_OPERATIONS_ERROR;
541         }
542
543         /* build the replication meta_data */
544         ZERO_STRUCT(nmd);
545         nmd.version             = 1;
546         nmd.ctr.ctr1.count      = msg->num_elements;
547         nmd.ctr.ctr1.array      = talloc_array(msg,
548                                                struct replPropertyMetaData1,
549                                                nmd.ctr.ctr1.count);
550         if (!nmd.ctr.ctr1.array) {
551                 ldb_oom(ldb);
552                 return LDB_ERR_OPERATIONS_ERROR;
553         }
554
555         for (i=0; i < msg->num_elements; i++) {
556                 struct ldb_message_element *e = &msg->elements[i];
557                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
558                 const struct dsdb_attribute *sa;
559
560                 if (e->name[0] == '@') continue;
561
562                 sa = dsdb_attribute_by_lDAPDisplayName(schema, e->name);
563                 if (!sa) {
564                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
565                                       "replmd_add: attribute '%s' not defined in schema\n",
566                                       e->name);
567                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
568                 }
569
570                 if ((sa->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (sa->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
571                         /* if the attribute is not replicated (0x00000001)
572                          * or constructed (0x00000004) it has no metadata
573                          */
574                         continue;
575                 }
576
577                 m->attid                        = sa->attributeID_id;
578                 m->version                      = 1;
579                 m->originating_change_time      = now;
580                 m->originating_invocation_id    = *our_invocation_id;
581                 m->originating_usn              = seq_num;
582                 m->local_usn                    = seq_num;
583                 ni++;
584
585                 if (ldb_attr_cmp(e->name, ldb_dn_get_rdn_name(msg->dn))) {
586                         rdn_attr = sa;
587                 }
588         }
589
590         /* fix meta data count */
591         nmd.ctr.ctr1.count = ni;
592
593         /*
594          * sort meta data array, and move the rdn attribute entry to the end
595          */
596         replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_attr->attributeID_id);
597
598         /* generated NDR encoded values */
599         ndr_err = ndr_push_struct_blob(&guid_value, msg, 
600                                        NULL,
601                                        &guid,
602                                        (ndr_push_flags_fn_t)ndr_push_GUID);
603         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
604                 ldb_oom(ldb);
605                 return LDB_ERR_OPERATIONS_ERROR;
606         }
607         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
608                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
609                                        &nmd,
610                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
611         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
612                 ldb_oom(ldb);
613                 return LDB_ERR_OPERATIONS_ERROR;
614         }
615
616         /*
617          * add the autogenerated values
618          */
619         ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
620         if (ret != LDB_SUCCESS) {
621                 ldb_oom(ldb);
622                 return LDB_ERR_OPERATIONS_ERROR;
623         }
624         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
625         if (ret != LDB_SUCCESS) {
626                 ldb_oom(ldb);
627                 return LDB_ERR_OPERATIONS_ERROR;
628         }
629         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
630         if (ret != LDB_SUCCESS) {
631                 ldb_oom(ldb);
632                 return LDB_ERR_OPERATIONS_ERROR;
633         }
634         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
635         if (ret != LDB_SUCCESS) {
636                 ldb_oom(ldb);
637                 return LDB_ERR_OPERATIONS_ERROR;
638         }
639         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
640         if (ret != LDB_SUCCESS) {
641                 ldb_oom(ldb);
642                 return LDB_ERR_OPERATIONS_ERROR;
643         }
644
645         /*
646          * sort the attributes by attid before storing the object
647          */
648         replmd_ldb_message_sort(msg, schema);
649
650         ret = ldb_build_add_req(&down_req, ldb, ac,
651                                 msg,
652                                 req->controls,
653                                 ac, replmd_op_callback,
654                                 req);
655         if (ret != LDB_SUCCESS) {
656                 return ret;
657         }
658
659         ret = replmd_notify(module, msg->dn, seq_num);
660         if (ret != LDB_SUCCESS) {
661                 return ret;
662         }
663
664         /* go on with the call chain */
665         return ldb_next_request(module, down_req);
666 }
667
668
669 /*
670  * update the replPropertyMetaData for one element
671  */
672 static int replmd_update_rpmd_element(struct ldb_context *ldb, 
673                                       struct ldb_message *msg,
674                                       struct ldb_message_element *el,
675                                       struct replPropertyMetaDataBlob *omd,
676                                       struct dsdb_schema *schema,
677                                       uint64_t *seq_num,
678                                       const struct GUID *our_invocation_id,
679                                       NTTIME now)
680 {
681         int i;
682         const struct dsdb_attribute *a;
683         struct replPropertyMetaData1 *md1;
684
685         a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
686         if (a == NULL) {
687                 DEBUG(0,(__location__ ": Unable to find attribute %s in schema\n",
688                          el->name));
689                 return LDB_ERR_OPERATIONS_ERROR;
690         }
691
692         if ((a->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (a->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
693                 return LDB_SUCCESS;
694         }
695
696         for (i=0; i<omd->ctr.ctr1.count; i++) {
697                 if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) break;
698         }
699         if (i == omd->ctr.ctr1.count) {
700                 /* we need to add a new one */
701                 omd->ctr.ctr1.array = talloc_realloc(msg, omd->ctr.ctr1.array, 
702                                                      struct replPropertyMetaData1, omd->ctr.ctr1.count+1);
703                 if (omd->ctr.ctr1.array == NULL) {
704                         ldb_oom(ldb);
705                         return LDB_ERR_OPERATIONS_ERROR;
706                 }
707                 omd->ctr.ctr1.count++;
708                 ZERO_STRUCT(omd->ctr.ctr1.array[i]);
709         }
710
711         /* Get a new sequence number from the backend. We only do this
712          * if we have a change that requires a new
713          * replPropertyMetaData element 
714          */
715         if (*seq_num == 0) {
716                 int ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num);
717                 if (ret != LDB_SUCCESS) {
718                         return LDB_ERR_OPERATIONS_ERROR;
719                 }
720         }
721
722         md1 = &omd->ctr.ctr1.array[i];
723         md1->version++;
724         md1->attid                     = a->attributeID_id;
725         md1->originating_change_time   = now;
726         md1->originating_invocation_id = *our_invocation_id;
727         md1->originating_usn           = *seq_num;
728         md1->local_usn                 = *seq_num;
729         
730         return LDB_SUCCESS;
731 }
732
733 /*
734  * update the replPropertyMetaData object each time we modify an
735  * object. This is needed for DRS replication, as the merge on the
736  * client is based on this object 
737  */
738 static int replmd_update_rpmd(struct ldb_module *module, 
739                               struct ldb_message *msg, uint64_t *seq_num)
740 {
741         const struct ldb_val *omd_value;
742         enum ndr_err_code ndr_err;
743         struct replPropertyMetaDataBlob omd;
744         int i;
745         struct dsdb_schema *schema;
746         time_t t = time(NULL);
747         NTTIME now;
748         const struct GUID *our_invocation_id;
749         int ret;
750         const char *attrs[] = { "replPropertyMetaData" , NULL };
751         struct ldb_result *res;
752         struct ldb_context *ldb;
753
754         ldb = ldb_module_get_ctx(module);
755
756         our_invocation_id = samdb_ntds_invocation_id(ldb);
757         if (!our_invocation_id) {
758                 /* this happens during an initial vampire while
759                    updating the schema */
760                 DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
761                 return LDB_SUCCESS;
762         }
763
764         unix_to_nt_time(&now, t);
765
766         /* search for the existing replPropertyMetaDataBlob */
767         ret = ldb_search(ldb, msg, &res, msg->dn, LDB_SCOPE_BASE, attrs, NULL);
768         if (ret != LDB_SUCCESS || res->count < 1) {
769                 DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
770                          ldb_dn_get_linearized(msg->dn)));
771                 return LDB_ERR_OPERATIONS_ERROR;
772         }
773                 
774
775         omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
776         if (!omd_value) {
777                 DEBUG(0,(__location__ ": Object %s does not have a replPropertyMetaData attribute\n",
778                          ldb_dn_get_linearized(msg->dn)));
779                 return LDB_ERR_OPERATIONS_ERROR;
780         }
781
782         ndr_err = ndr_pull_struct_blob(omd_value, msg,
783                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
784                                        (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
785         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
786                 DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
787                          ldb_dn_get_linearized(msg->dn)));
788                 return LDB_ERR_OPERATIONS_ERROR;
789         }
790
791         if (omd.version != 1) {
792                 DEBUG(0,(__location__ ": bad version %u in replPropertyMetaData for %s\n",
793                          omd.version, ldb_dn_get_linearized(msg->dn)));
794                 return LDB_ERR_OPERATIONS_ERROR;
795         }
796
797         schema = dsdb_get_schema(ldb);
798
799         for (i=0; i<msg->num_elements; i++) {
800                 ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], &omd, schema, seq_num, 
801                                                  our_invocation_id, now);
802                 if (ret != LDB_SUCCESS) {
803                         return ret;
804                 }
805         }
806
807         /*
808          * replmd_update_rpmd_element has done an update if the
809          * seq_num is set
810          */
811         if (*seq_num != 0) {
812                 struct ldb_val *md_value;
813                 struct ldb_message_element *el;
814                 const char *rdn_name;
815                 const struct dsdb_attribute *rdn_sa;
816
817                 rdn_name = ldb_dn_get_rdn_name(msg->dn);
818                 if (!rdn_name) {
819                         DEBUG(0,(__location__ ": No rDN for %s?\n", ldb_dn_get_linearized(msg->dn)));
820                         return LDB_ERR_OPERATIONS_ERROR;
821                 }
822                 rdn_sa = dsdb_attribute_by_lDAPDisplayName(schema, rdn_name);
823                 if (rdn_sa == NULL) {
824                         DEBUG(0,(__location__ ": sa not found for rDN %s in %s?\n", 
825                                  rdn_name, ldb_dn_get_linearized(msg->dn)));
826                         return LDB_ERR_OPERATIONS_ERROR;
827                 }
828
829                 md_value = talloc(msg, struct ldb_val);
830                 if (md_value == NULL) {
831                         ldb_oom(ldb);
832                         return LDB_ERR_OPERATIONS_ERROR;
833                 }
834
835                 replmd_replPropertyMetaDataCtr1_sort(&omd.ctr.ctr1, &rdn_sa->attributeID_id);
836
837                 ndr_err = ndr_push_struct_blob(md_value, msg, 
838                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
839                                                &omd,
840                                                (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
841                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
842                         DEBUG(0,(__location__ ": Failed to marshall replPropertyMetaData for %s\n",
843                                  ldb_dn_get_linearized(msg->dn)));
844                         return LDB_ERR_OPERATIONS_ERROR;
845                 }
846
847                 ret = ldb_msg_add_empty(msg, "replPropertyMetaData", LDB_FLAG_MOD_REPLACE, &el);
848                 if (ret != LDB_SUCCESS) {
849                         DEBUG(0,(__location__ ": Failed to add updated replPropertyMetaData %s\n",
850                                  ldb_dn_get_linearized(msg->dn)));
851                         return ret;
852                 }
853
854                 ret = replmd_notify(module, msg->dn, *seq_num);
855                 if (ret != LDB_SUCCESS) {
856                         return ret;
857                 }
858
859                 el->num_values = 1;
860                 el->values = md_value;
861         }
862
863         return LDB_SUCCESS;     
864 }
865
866
867 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
868 {
869         struct ldb_context *ldb;
870         struct replmd_replicated_request *ac;
871         const struct dsdb_schema *schema;
872         struct ldb_request *down_req;
873         struct ldb_message *msg;
874         int ret;
875         time_t t = time(NULL);
876         uint64_t seq_num = 0;
877
878         /* do not manipulate our control entries */
879         if (ldb_dn_is_special(req->op.mod.message->dn)) {
880                 return ldb_next_request(module, req);
881         }
882
883         ldb = ldb_module_get_ctx(module);
884
885         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
886
887         schema = dsdb_get_schema(ldb);
888         if (!schema) {
889                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
890                               "replmd_modify: no dsdb_schema loaded");
891                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
892                 return LDB_ERR_CONSTRAINT_VIOLATION;
893         }
894
895         ac = replmd_ctx_init(module, req);
896         if (!ac) {
897                 return LDB_ERR_OPERATIONS_ERROR;
898         }
899
900         ac->schema = schema;
901
902         /* we have to copy the message as the caller might have it as a const */
903         msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
904         if (msg == NULL) {
905                 talloc_free(ac);
906                 return LDB_ERR_OPERATIONS_ERROR;
907         }
908
909         /* TODO:
910          * - get the whole old object
911          * - if the old object doesn't exist report an error
912          * - give an error when a readonly attribute should
913          *   be modified
914          * - merge the changed into the old object
915          *   if the caller set values to the same value
916          *   ignore the attribute, return success when no
917          *   attribute was changed
918          */
919
920         ret = replmd_update_rpmd(module, msg, &seq_num);
921         if (ret != LDB_SUCCESS) {
922                 return ret;
923         }
924
925         /* TODO:
926          * - replace the old object with the newly constructed one
927          */
928
929         ret = ldb_build_mod_req(&down_req, ldb, ac,
930                                 msg,
931                                 req->controls,
932                                 ac, replmd_op_callback,
933                                 req);
934         if (ret != LDB_SUCCESS) {
935                 return ret;
936         }
937         talloc_steal(down_req, msg);
938
939         /* we only change whenChanged and uSNChanged if the seq_num
940            has changed */
941         if (seq_num != 0) {
942                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
943                         talloc_free(ac);
944                         return LDB_ERR_OPERATIONS_ERROR;
945                 }
946
947                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
948                         talloc_free(ac);
949                         return LDB_ERR_OPERATIONS_ERROR;
950                 }
951         }
952
953         /* go on with the call chain */
954         return ldb_next_request(module, down_req);
955 }
956
957
958 /*
959   handle a rename request
960
961   On a rename we need to do an extra ldb_modify which sets the
962   whenChanged and uSNChanged attributes
963  */
964 static int replmd_rename(struct ldb_module *module, struct ldb_request *req)
965 {
966         struct ldb_context *ldb;
967         int ret, i;
968         time_t t = time(NULL);
969         uint64_t seq_num = 0;
970         struct ldb_message *msg;
971         struct replmd_private *replmd_private = 
972                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
973
974         /* do not manipulate our control entries */
975         if (ldb_dn_is_special(req->op.mod.message->dn)) {
976                 return ldb_next_request(module, req);
977         }
978
979         ldb = ldb_module_get_ctx(module);
980
981         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_rename\n");
982
983         /* Get a sequence number from the backend */
984         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
985         if (ret != LDB_SUCCESS) {
986                 return ret;
987         }
988
989         msg = ldb_msg_new(req);
990         if (msg == NULL) {
991                 ldb_oom(ldb);
992                 return LDB_ERR_OPERATIONS_ERROR;
993         }
994
995         msg->dn = req->op.rename.olddn;
996
997         if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
998                 talloc_free(msg);
999                 return LDB_ERR_OPERATIONS_ERROR;
1000         }
1001         msg->elements[0].flags = LDB_FLAG_MOD_REPLACE;
1002
1003         if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
1004                 talloc_free(msg);
1005                 return LDB_ERR_OPERATIONS_ERROR;
1006         }
1007         msg->elements[1].flags = LDB_FLAG_MOD_REPLACE;
1008
1009         ret = ldb_modify(ldb, msg);
1010         talloc_free(msg);
1011         if (ret != LDB_SUCCESS) {
1012                 return ret;
1013         }
1014
1015         ret = replmd_load_NCs(module);
1016         if (ret != 0) {
1017                 return ret;
1018         }
1019
1020         /* now update the highest uSNs of the partitions that are
1021            affected. Note that two partitions could be changing */
1022         for (i=0; i<replmd_private->num_ncs; i++) {
1023                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, 
1024                                         req->op.rename.olddn) == 0) {
1025                         break;
1026                 }
1027         }
1028         if (i == replmd_private->num_ncs) {
1029                 DEBUG(0,(__location__ ": rename olddn outside tree? %s\n",
1030                          ldb_dn_get_linearized(req->op.rename.olddn)));
1031                 return LDB_ERR_OPERATIONS_ERROR;
1032         }
1033         replmd_private->ncs[i].mod_usn = seq_num;
1034
1035         for (i=0; i<replmd_private->num_ncs; i++) {
1036                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, 
1037                                         req->op.rename.newdn) == 0) {
1038                         break;
1039                 }
1040         }
1041         if (i == replmd_private->num_ncs) {
1042                 DEBUG(0,(__location__ ": rename newdn outside tree? %s\n",
1043                          ldb_dn_get_linearized(req->op.rename.newdn)));
1044                 return LDB_ERR_OPERATIONS_ERROR;
1045         }
1046         replmd_private->ncs[i].mod_usn = seq_num;
1047         
1048         /* go on with the call chain */
1049         return ldb_next_request(module, req);
1050 }
1051
1052
1053 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
1054 {
1055         return ret;
1056 }
1057
1058 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
1059 {
1060         int ret = LDB_ERR_OTHER;
1061         /* TODO: do some error mapping */
1062         return ret;
1063 }
1064
1065 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
1066
1067 static int replmd_replicated_apply_add_callback(struct ldb_request *req,
1068                                                 struct ldb_reply *ares)
1069 {
1070         struct ldb_context *ldb;
1071         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1072                                                struct replmd_replicated_request);
1073         int ret;
1074
1075         ldb = ldb_module_get_ctx(ar->module);
1076
1077         if (!ares) {
1078                 return ldb_module_done(ar->req, NULL, NULL,
1079                                         LDB_ERR_OPERATIONS_ERROR);
1080         }
1081         if (ares->error != LDB_SUCCESS) {
1082                 return ldb_module_done(ar->req, ares->controls,
1083                                         ares->response, ares->error);
1084         }
1085
1086         if (ares->type != LDB_REPLY_DONE) {
1087                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1088                 return ldb_module_done(ar->req, NULL, NULL,
1089                                         LDB_ERR_OPERATIONS_ERROR);
1090         }
1091
1092         talloc_free(ares);
1093         ar->index_current++;
1094
1095         ret = replmd_replicated_apply_next(ar);
1096         if (ret != LDB_SUCCESS) {
1097                 return ldb_module_done(ar->req, NULL, NULL, ret);
1098         }
1099
1100         return LDB_SUCCESS;
1101 }
1102
1103 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
1104 {
1105         struct ldb_context *ldb;
1106         struct ldb_request *change_req;
1107         enum ndr_err_code ndr_err;
1108         struct ldb_message *msg;
1109         struct replPropertyMetaDataBlob *md;
1110         struct ldb_val md_value;
1111         uint32_t i;
1112         uint64_t seq_num;
1113         int ret;
1114
1115         /*
1116          * TODO: check if the parent object exist
1117          */
1118
1119         /*
1120          * TODO: handle the conflict case where an object with the
1121          *       same name exist
1122          */
1123
1124         ldb = ldb_module_get_ctx(ar->module);
1125         msg = ar->objs->objects[ar->index_current].msg;
1126         md = ar->objs->objects[ar->index_current].meta_data;
1127
1128         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1129         if (ret != LDB_SUCCESS) {
1130                 return replmd_replicated_request_error(ar, ret);
1131         }
1132
1133         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
1134         if (ret != LDB_SUCCESS) {
1135                 return replmd_replicated_request_error(ar, ret);
1136         }
1137
1138         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1139         if (ret != LDB_SUCCESS) {
1140                 return replmd_replicated_request_error(ar, ret);
1141         }
1142
1143         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
1144         if (ret != LDB_SUCCESS) {
1145                 return replmd_replicated_request_error(ar, ret);
1146         }
1147
1148         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1149         if (ret != LDB_SUCCESS) {
1150                 return replmd_replicated_request_error(ar, ret);
1151         }
1152
1153         ret = replmd_notify(ar->module, msg->dn, seq_num);
1154         if (ret != LDB_SUCCESS) {
1155                 return replmd_replicated_request_error(ar, ret);
1156         }
1157
1158         /* remove any message elements that have zero values */
1159         for (i=0; i<msg->num_elements; i++) {
1160                 if (msg->elements[i].num_values == 0) {
1161                         DEBUG(4,(__location__ ": Removing attribute %s with num_values==0\n",
1162                                  msg->elements[i].name));
1163                         memmove(&msg->elements[i], 
1164                                 &msg->elements[i+1], 
1165                                 sizeof(msg->elements[i])*(msg->num_elements - (i+1)));
1166                         msg->num_elements--;
1167                         i--;
1168                 }
1169         }
1170         
1171         /*
1172          * the meta data array is already sorted by the caller
1173          */
1174         for (i=0; i < md->ctr.ctr1.count; i++) {
1175                 md->ctr.ctr1.array[i].local_usn = seq_num;
1176         }
1177         ndr_err = ndr_push_struct_blob(&md_value, msg, 
1178                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1179                                        md,
1180                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1181         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1182                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1183                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1184         }
1185         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
1186         if (ret != LDB_SUCCESS) {
1187                 return replmd_replicated_request_error(ar, ret);
1188         }
1189
1190         replmd_ldb_message_sort(msg, ar->schema);
1191
1192         if (DEBUGLVL(4)) {
1193                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
1194                 DEBUG(4, ("DRS replication add message:\n%s\n", s));
1195                 talloc_free(s);
1196         }
1197
1198         ret = ldb_build_add_req(&change_req,
1199                                 ldb,
1200                                 ar,
1201                                 msg,
1202                                 ar->controls,
1203                                 ar,
1204                                 replmd_replicated_apply_add_callback,
1205                                 ar->req);
1206         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1207
1208         return ldb_next_request(ar->module, change_req);
1209 }
1210
1211 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
1212                                                          struct replPropertyMetaData1 *m2)
1213 {
1214         int ret;
1215
1216         if (m1->version != m2->version) {
1217                 return m1->version - m2->version;
1218         }
1219
1220         if (m1->originating_change_time != m2->originating_change_time) {
1221                 return m1->originating_change_time - m2->originating_change_time;
1222         }
1223
1224         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
1225         if (ret != 0) {
1226                 return ret;
1227         }
1228
1229         return m1->originating_usn - m2->originating_usn;
1230 }
1231
1232 static int replmd_replicated_apply_merge_callback(struct ldb_request *req,
1233                                                   struct ldb_reply *ares)
1234 {
1235         struct ldb_context *ldb;
1236         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1237                                                struct replmd_replicated_request);
1238         int ret;
1239
1240         ldb = ldb_module_get_ctx(ar->module);
1241
1242         if (!ares) {
1243                 return ldb_module_done(ar->req, NULL, NULL,
1244                                         LDB_ERR_OPERATIONS_ERROR);
1245         }
1246         if (ares->error != LDB_SUCCESS) {
1247                 return ldb_module_done(ar->req, ares->controls,
1248                                         ares->response, ares->error);
1249         }
1250
1251         if (ares->type != LDB_REPLY_DONE) {
1252                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1253                 return ldb_module_done(ar->req, NULL, NULL,
1254                                         LDB_ERR_OPERATIONS_ERROR);
1255         }
1256
1257         talloc_free(ares);
1258         ar->index_current++;
1259
1260         ret = replmd_replicated_apply_next(ar);
1261         if (ret != LDB_SUCCESS) {
1262                 return ldb_module_done(ar->req, NULL, NULL, ret);
1263         }
1264
1265         return LDB_SUCCESS;
1266 }
1267
1268 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
1269 {
1270         struct ldb_context *ldb;
1271         struct ldb_request *change_req;
1272         enum ndr_err_code ndr_err;
1273         struct ldb_message *msg;
1274         struct replPropertyMetaDataBlob *rmd;
1275         struct replPropertyMetaDataBlob omd;
1276         const struct ldb_val *omd_value;
1277         struct replPropertyMetaDataBlob nmd;
1278         struct ldb_val nmd_value;
1279         uint32_t i,j,ni=0;
1280         uint32_t removed_attrs = 0;
1281         uint64_t seq_num;
1282         int ret;
1283
1284         ldb = ldb_module_get_ctx(ar->module);
1285         msg = ar->objs->objects[ar->index_current].msg;
1286         rmd = ar->objs->objects[ar->index_current].meta_data;
1287         ZERO_STRUCT(omd);
1288         omd.version = 1;
1289
1290         /*
1291          * TODO: check repl data is correct after a rename
1292          */
1293         if (ldb_dn_compare(msg->dn, ar->search_msg->dn) != 0) {
1294                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_request rename %s => %s\n",
1295                           ldb_dn_get_linearized(ar->search_msg->dn),
1296                           ldb_dn_get_linearized(msg->dn));
1297                 if (ldb_rename(ldb, ar->search_msg->dn, msg->dn) != LDB_SUCCESS) {
1298                         ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_replicated_request rename %s => %s failed - %s\n",
1299                                   ldb_dn_get_linearized(ar->search_msg->dn),
1300                                   ldb_dn_get_linearized(msg->dn),
1301                                   ldb_errstring(ldb));
1302                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
1303                 }
1304         }
1305
1306         /* find existing meta data */
1307         omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
1308         if (omd_value) {
1309                 ndr_err = ndr_pull_struct_blob(omd_value, ar,
1310                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
1311                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
1312                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1313                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1314                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1315                 }
1316
1317                 if (omd.version != 1) {
1318                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1319                 }
1320         }
1321
1322         ZERO_STRUCT(nmd);
1323         nmd.version = 1;
1324         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
1325         nmd.ctr.ctr1.array = talloc_array(ar,
1326                                           struct replPropertyMetaData1,
1327                                           nmd.ctr.ctr1.count);
1328         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1329
1330         /* first copy the old meta data */
1331         for (i=0; i < omd.ctr.ctr1.count; i++) {
1332                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
1333                 ni++;
1334         }
1335
1336         /* now merge in the new meta data */
1337         for (i=0; i < rmd->ctr.ctr1.count; i++) {
1338                 bool found = false;
1339
1340                 for (j=0; j < ni; j++) {
1341                         int cmp;
1342
1343                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
1344                                 continue;
1345                         }
1346
1347                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
1348                                                                             &nmd.ctr.ctr1.array[j]);
1349                         if (cmp > 0) {
1350                                 /* replace the entry */
1351                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
1352                                 found = true;
1353                                 break;
1354                         }
1355
1356                         /* we don't want to apply this change so remove the attribute */
1357                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
1358                         removed_attrs++;
1359
1360                         found = true;
1361                         break;
1362                 }
1363
1364                 if (found) continue;
1365
1366                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
1367                 ni++;
1368         }
1369
1370         /*
1371          * finally correct the size of the meta_data array
1372          */
1373         nmd.ctr.ctr1.count = ni;
1374
1375         /*
1376          * the rdn attribute (the alias for the name attribute),
1377          * 'cn' for most objects is the last entry in the meta data array
1378          * we have stored
1379          *
1380          * sort the new meta data array
1381          */
1382         {
1383                 struct replPropertyMetaData1 *rdn_p;
1384                 uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
1385
1386                 rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
1387                 replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_p->attid);
1388         }
1389
1390         /*
1391          * check if some replicated attributes left, otherwise skip the ldb_modify() call
1392          */
1393         if (msg->num_elements == 0) {
1394                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
1395                           ar->index_current);
1396
1397                 ar->index_current++;
1398                 return replmd_replicated_apply_next(ar);
1399         }
1400
1401         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
1402                   ar->index_current, msg->num_elements);
1403
1404         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1405         if (ret != LDB_SUCCESS) {
1406                 return replmd_replicated_request_error(ar, ret);
1407         }
1408
1409         for (i=0; i<ni; i++) {
1410                 nmd.ctr.ctr1.array[i].local_usn = seq_num;
1411         }
1412
1413         /* create the meta data value */
1414         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
1415                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1416                                        &nmd,
1417                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1418         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1419                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1420                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1421         }
1422
1423         /*
1424          * when we know that we'll modify the record, add the whenChanged, uSNChanged
1425          * and replPopertyMetaData attributes
1426          */
1427         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1428         if (ret != LDB_SUCCESS) {
1429                 return replmd_replicated_request_error(ar, ret);
1430         }
1431         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1432         if (ret != LDB_SUCCESS) {
1433                 return replmd_replicated_request_error(ar, ret);
1434         }
1435         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
1436         if (ret != LDB_SUCCESS) {
1437                 return replmd_replicated_request_error(ar, ret);
1438         }
1439
1440         replmd_ldb_message_sort(msg, ar->schema);
1441
1442         /* we want to replace the old values */
1443         for (i=0; i < msg->num_elements; i++) {
1444                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
1445         }
1446
1447         ret = replmd_notify(ar->module, msg->dn, seq_num);
1448         if (ret != LDB_SUCCESS) {
1449                 return replmd_replicated_request_error(ar, ret);
1450         }
1451
1452         if (DEBUGLVL(4)) {
1453                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1454                 DEBUG(4, ("DRS replication modify message:\n%s\n", s));
1455                 talloc_free(s);
1456         }
1457
1458         ret = ldb_build_mod_req(&change_req,
1459                                 ldb,
1460                                 ar,
1461                                 msg,
1462                                 ar->controls,
1463                                 ar,
1464                                 replmd_replicated_apply_merge_callback,
1465                                 ar->req);
1466         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1467
1468         return ldb_next_request(ar->module, change_req);
1469 }
1470
1471 static int replmd_replicated_apply_search_callback(struct ldb_request *req,
1472                                                    struct ldb_reply *ares)
1473 {
1474         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1475                                                struct replmd_replicated_request);
1476         int ret;
1477
1478         if (!ares) {
1479                 return ldb_module_done(ar->req, NULL, NULL,
1480                                         LDB_ERR_OPERATIONS_ERROR);
1481         }
1482         if (ares->error != LDB_SUCCESS &&
1483             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1484                 return ldb_module_done(ar->req, ares->controls,
1485                                         ares->response, ares->error);
1486         }
1487
1488         switch (ares->type) {
1489         case LDB_REPLY_ENTRY:
1490                 ar->search_msg = talloc_steal(ar, ares->message);
1491                 break;
1492
1493         case LDB_REPLY_REFERRAL:
1494                 /* we ignore referrals */
1495                 break;
1496
1497         case LDB_REPLY_DONE:
1498                 if (ar->search_msg != NULL) {
1499                         ret = replmd_replicated_apply_merge(ar);
1500                 } else {
1501                         ret = replmd_replicated_apply_add(ar);
1502                 }
1503                 if (ret != LDB_SUCCESS) {
1504                         return ldb_module_done(ar->req, NULL, NULL, ret);
1505                 }
1506         }
1507
1508         talloc_free(ares);
1509         return LDB_SUCCESS;
1510 }
1511
1512 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
1513
1514 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
1515 {
1516         struct ldb_context *ldb;
1517         int ret;
1518         char *tmp_str;
1519         char *filter;
1520         struct ldb_request *search_req;
1521
1522         if (ar->index_current >= ar->objs->num_objects) {
1523                 /* done with it, go to next stage */
1524                 return replmd_replicated_uptodate_vector(ar);
1525         }
1526
1527         ldb = ldb_module_get_ctx(ar->module);
1528         ar->search_msg = NULL;
1529
1530         tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].guid_value);
1531         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1532
1533         filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
1534         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1535         talloc_free(tmp_str);
1536
1537         ret = ldb_build_search_req(&search_req,
1538                                    ldb,
1539                                    ar,
1540                                    ar->objs->partition_dn,
1541                                    LDB_SCOPE_SUBTREE,
1542                                    filter,
1543                                    NULL,
1544                                    NULL,
1545                                    ar,
1546                                    replmd_replicated_apply_search_callback,
1547                                    ar->req);
1548         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1549
1550         return ldb_next_request(ar->module, search_req);
1551 }
1552
1553 static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
1554                                                       struct ldb_reply *ares)
1555 {
1556         struct ldb_context *ldb;
1557         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1558                                                struct replmd_replicated_request);
1559         ldb = ldb_module_get_ctx(ar->module);
1560
1561         if (!ares) {
1562                 return ldb_module_done(ar->req, NULL, NULL,
1563                                         LDB_ERR_OPERATIONS_ERROR);
1564         }
1565         if (ares->error != LDB_SUCCESS) {
1566                 return ldb_module_done(ar->req, ares->controls,
1567                                         ares->response, ares->error);
1568         }
1569
1570         if (ares->type != LDB_REPLY_DONE) {
1571                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1572                 return ldb_module_done(ar->req, NULL, NULL,
1573                                         LDB_ERR_OPERATIONS_ERROR);
1574         }
1575
1576         talloc_free(ares);
1577
1578         return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
1579 }
1580
1581 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1582 {
1583         struct ldb_context *ldb;
1584         struct ldb_request *change_req;
1585         enum ndr_err_code ndr_err;
1586         struct ldb_message *msg;
1587         struct replUpToDateVectorBlob ouv;
1588         const struct ldb_val *ouv_value;
1589         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1590         struct replUpToDateVectorBlob nuv;
1591         struct ldb_val nuv_value;
1592         struct ldb_message_element *nuv_el = NULL;
1593         const struct GUID *our_invocation_id;
1594         struct ldb_message_element *orf_el = NULL;
1595         struct repsFromToBlob nrf;
1596         struct ldb_val *nrf_value = NULL;
1597         struct ldb_message_element *nrf_el = NULL;
1598         uint32_t i,j,ni=0;
1599         bool found = false;
1600         time_t t = time(NULL);
1601         NTTIME now;
1602         int ret;
1603
1604         ldb = ldb_module_get_ctx(ar->module);
1605         ruv = ar->objs->uptodateness_vector;
1606         ZERO_STRUCT(ouv);
1607         ouv.version = 2;
1608         ZERO_STRUCT(nuv);
1609         nuv.version = 2;
1610
1611         unix_to_nt_time(&now, t);
1612
1613         /*
1614          * first create the new replUpToDateVector
1615          */
1616         ouv_value = ldb_msg_find_ldb_val(ar->search_msg, "replUpToDateVector");
1617         if (ouv_value) {
1618                 ndr_err = ndr_pull_struct_blob(ouv_value, ar,
1619                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &ouv,
1620                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1621                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1622                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1623                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1624                 }
1625
1626                 if (ouv.version != 2) {
1627                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1628                 }
1629         }
1630
1631         /*
1632          * the new uptodateness vector will at least
1633          * contain 1 entry, one for the source_dsa
1634          *
1635          * plus optional values from our old vector and the one from the source_dsa
1636          */
1637         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
1638         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1639         nuv.ctr.ctr2.cursors = talloc_array(ar,
1640                                             struct drsuapi_DsReplicaCursor2,
1641                                             nuv.ctr.ctr2.count);
1642         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1643
1644         /* first copy the old vector */
1645         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1646                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1647                 ni++;
1648         }
1649
1650         /* get our invocation_id if we have one already attached to the ldb */
1651         our_invocation_id = samdb_ntds_invocation_id(ldb);
1652
1653         /* merge in the source_dsa vector is available */
1654         for (i=0; (ruv && i < ruv->count); i++) {
1655                 found = false;
1656
1657                 if (our_invocation_id &&
1658                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1659                                our_invocation_id)) {
1660                         continue;
1661                 }
1662
1663                 for (j=0; j < ni; j++) {
1664                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1665                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1666                                 continue;
1667                         }
1668
1669                         found = true;
1670
1671                         /*
1672                          * we update only the highest_usn and not the latest_sync_success time,
1673                          * because the last success stands for direct replication
1674                          */
1675                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1676                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1677                         }
1678                         break;                  
1679                 }
1680
1681                 if (found) continue;
1682
1683                 /* if it's not there yet, add it */
1684                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1685                 ni++;
1686         }
1687
1688         /*
1689          * merge in the current highwatermark for the source_dsa
1690          */
1691         found = false;
1692         for (j=0; j < ni; j++) {
1693                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1694                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1695                         continue;
1696                 }
1697
1698                 found = true;
1699
1700                 /*
1701                  * here we update the highest_usn and last_sync_success time
1702                  * because we're directly replicating from the source_dsa
1703                  *
1704                  * and use the tmp_highest_usn because this is what we have just applied
1705                  * to our ldb
1706                  */
1707                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1708                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1709                 break;
1710         }
1711         if (!found) {
1712                 /*
1713                  * here we update the highest_usn and last_sync_success time
1714                  * because we're directly replicating from the source_dsa
1715                  *
1716                  * and use the tmp_highest_usn because this is what we have just applied
1717                  * to our ldb
1718                  */
1719                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1720                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1721                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1722                 ni++;
1723         }
1724
1725         /*
1726          * finally correct the size of the cursors array
1727          */
1728         nuv.ctr.ctr2.count = ni;
1729
1730         /*
1731          * sort the cursors
1732          */
1733         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1734               sizeof(struct drsuapi_DsReplicaCursor2),
1735               (comparison_fn_t)drsuapi_DsReplicaCursor2_compare);
1736
1737         /*
1738          * create the change ldb_message
1739          */
1740         msg = ldb_msg_new(ar);
1741         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1742         msg->dn = ar->search_msg->dn;
1743
1744         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
1745                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1746                                        &nuv,
1747                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1748         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1749                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1750                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1751         }
1752         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1753         if (ret != LDB_SUCCESS) {
1754                 return replmd_replicated_request_error(ar, ret);
1755         }
1756         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1757
1758         /*
1759          * now create the new repsFrom value from the given repsFromTo1 structure
1760          */
1761         ZERO_STRUCT(nrf);
1762         nrf.version                                     = 1;
1763         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1764         /* and fix some values... */
1765         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1766         nrf.ctr.ctr1.last_success                       = now;
1767         nrf.ctr.ctr1.last_attempt                       = now;
1768         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1769         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1770
1771         /*
1772          * first see if we already have a repsFrom value for the current source dsa
1773          * if so we'll later replace this value
1774          */
1775         orf_el = ldb_msg_find_element(ar->search_msg, "repsFrom");
1776         if (orf_el) {
1777                 for (i=0; i < orf_el->num_values; i++) {
1778                         struct repsFromToBlob *trf;
1779
1780                         trf = talloc(ar, struct repsFromToBlob);
1781                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1782
1783                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), trf,
1784                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1785                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1786                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1787                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1788                         }
1789
1790                         if (trf->version != 1) {
1791                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1792                         }
1793
1794                         /*
1795                          * we compare the source dsa objectGUID not the invocation_id
1796                          * because we want only one repsFrom value per source dsa
1797                          * and when the invocation_id of the source dsa has changed we don't need 
1798                          * the old repsFrom with the old invocation_id
1799                          */
1800                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1801                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1802                                 talloc_free(trf);
1803                                 continue;
1804                         }
1805
1806                         talloc_free(trf);
1807                         nrf_value = &orf_el->values[i];
1808                         break;
1809                 }
1810
1811                 /*
1812                  * copy over all old values to the new ldb_message
1813                  */
1814                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1815                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1816                 *nrf_el = *orf_el;
1817         }
1818
1819         /*
1820          * if we haven't found an old repsFrom value for the current source dsa
1821          * we'll add a new value
1822          */
1823         if (!nrf_value) {
1824                 struct ldb_val zero_value;
1825                 ZERO_STRUCT(zero_value);
1826                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1827                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1828
1829                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1830         }
1831
1832         /* we now fill the value which is already attached to ldb_message */
1833         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
1834                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1835                                        &nrf,
1836                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1837         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1838                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1839                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1840         }
1841
1842         /* 
1843          * the ldb_message_element for the attribute, has all the old values and the new one
1844          * so we'll replace the whole attribute with all values
1845          */
1846         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1847
1848         if (DEBUGLVL(4)) {
1849                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1850                 DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
1851                 talloc_free(s);
1852         }
1853
1854         /* prepare the ldb_modify() request */
1855         ret = ldb_build_mod_req(&change_req,
1856                                 ldb,
1857                                 ar,
1858                                 msg,
1859                                 ar->controls,
1860                                 ar,
1861                                 replmd_replicated_uptodate_modify_callback,
1862                                 ar->req);
1863         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1864
1865         return ldb_next_request(ar->module, change_req);
1866 }
1867
1868 static int replmd_replicated_uptodate_search_callback(struct ldb_request *req,
1869                                                       struct ldb_reply *ares)
1870 {
1871         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1872                                                struct replmd_replicated_request);
1873         int ret;
1874
1875         if (!ares) {
1876                 return ldb_module_done(ar->req, NULL, NULL,
1877                                         LDB_ERR_OPERATIONS_ERROR);
1878         }
1879         if (ares->error != LDB_SUCCESS &&
1880             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1881                 return ldb_module_done(ar->req, ares->controls,
1882                                         ares->response, ares->error);
1883         }
1884
1885         switch (ares->type) {
1886         case LDB_REPLY_ENTRY:
1887                 ar->search_msg = talloc_steal(ar, ares->message);
1888                 break;
1889
1890         case LDB_REPLY_REFERRAL:
1891                 /* we ignore referrals */
1892                 break;
1893
1894         case LDB_REPLY_DONE:
1895                 if (ar->search_msg == NULL) {
1896                         ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1897                 } else {
1898                         ret = replmd_replicated_uptodate_modify(ar);
1899                 }
1900                 if (ret != LDB_SUCCESS) {
1901                         return ldb_module_done(ar->req, NULL, NULL, ret);
1902                 }
1903         }
1904
1905         talloc_free(ares);
1906         return LDB_SUCCESS;
1907 }
1908
1909
1910 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1911 {
1912         struct ldb_context *ldb;
1913         int ret;
1914         static const char *attrs[] = {
1915                 "replUpToDateVector",
1916                 "repsFrom",
1917                 NULL
1918         };
1919         struct ldb_request *search_req;
1920
1921         ldb = ldb_module_get_ctx(ar->module);
1922         ar->search_msg = NULL;
1923
1924         ret = ldb_build_search_req(&search_req,
1925                                    ldb,
1926                                    ar,
1927                                    ar->objs->partition_dn,
1928                                    LDB_SCOPE_BASE,
1929                                    "(objectClass=*)",
1930                                    attrs,
1931                                    NULL,
1932                                    ar,
1933                                    replmd_replicated_uptodate_search_callback,
1934                                    ar->req);
1935         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1936
1937         return ldb_next_request(ar->module, search_req);
1938 }
1939
1940
1941
1942 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1943 {
1944         struct ldb_context *ldb;
1945         struct dsdb_extended_replicated_objects *objs;
1946         struct replmd_replicated_request *ar;
1947         struct ldb_control **ctrls;
1948         int ret, i;
1949         struct dsdb_control_current_partition *partition_ctrl;
1950         struct replmd_private *replmd_private = 
1951                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1952
1953         ldb = ldb_module_get_ctx(module);
1954
1955         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1956
1957         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1958         if (!objs) {
1959                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1960                 return LDB_ERR_PROTOCOL_ERROR;
1961         }
1962
1963         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1964                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1965                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1966                 return LDB_ERR_PROTOCOL_ERROR;
1967         }
1968
1969         ar = replmd_ctx_init(module, req);
1970         if (!ar)
1971                 return LDB_ERR_OPERATIONS_ERROR;
1972
1973         ar->objs = objs;
1974         ar->schema = dsdb_get_schema(ldb);
1975         if (!ar->schema) {
1976                 ldb_debug_set(ldb, LDB_DEBUG_FATAL, "replmd_ctx_init: no loaded schema found\n");
1977                 talloc_free(ar);
1978                 DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
1979                 return LDB_ERR_CONSTRAINT_VIOLATION;
1980         }
1981
1982         ctrls = req->controls;
1983
1984         if (req->controls) {
1985                 req->controls = talloc_memdup(ar, req->controls,
1986                                               talloc_get_size(req->controls));
1987                 if (!req->controls) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1988         }
1989
1990         ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, NULL);
1991         if (ret != LDB_SUCCESS) {
1992                 return ret;
1993         }
1994
1995         /*
1996           add the DSDB_CONTROL_CURRENT_PARTITION_OID control. This
1997           tells the partition module which partition this request is
1998           directed at. That is important as the partition roots appear
1999           twice in the directory, once as mount points in the top
2000           level store, and once as the roots of each partition. The
2001           replication code wants to operate on the root of the
2002           partitions, not the top level mount points
2003          */
2004         partition_ctrl = talloc(req, struct dsdb_control_current_partition);
2005         if (partition_ctrl == NULL) {
2006                 if (!partition_ctrl) return replmd_replicated_request_werror(ar, WERR_NOMEM);
2007         }
2008         partition_ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
2009         partition_ctrl->dn = objs->partition_dn;
2010
2011         ret = ldb_request_add_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID, false, partition_ctrl);
2012         if (ret != LDB_SUCCESS) {
2013                 return ret;
2014         }
2015
2016         ar->controls = req->controls;
2017         req->controls = ctrls;
2018
2019         DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
2020
2021         /* save away the linked attributes for the end of the
2022            transaction */
2023         for (i=0; i<ar->objs->linked_attributes_count; i++) {
2024                 struct la_entry *la_entry;
2025
2026                 if (replmd_private->la_ctx == NULL) {
2027                         replmd_private->la_ctx = talloc_new(replmd_private);
2028                 }
2029                 la_entry = talloc(replmd_private->la_ctx, struct la_entry);
2030                 if (la_entry == NULL) {
2031                         ldb_oom(ldb);
2032                         return LDB_ERR_OPERATIONS_ERROR;
2033                 }
2034                 la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
2035                 if (la_entry->la == NULL) {
2036                         talloc_free(la_entry);
2037                         ldb_oom(ldb);
2038                         return LDB_ERR_OPERATIONS_ERROR;
2039                 }
2040                 *la_entry->la = ar->objs->linked_attributes[i];
2041
2042                 /* we need to steal the non-scalars so they stay
2043                    around until the end of the transaction */
2044                 talloc_steal(la_entry->la, la_entry->la->identifier);
2045                 talloc_steal(la_entry->la, la_entry->la->value.blob);
2046
2047                 DLIST_ADD(replmd_private->la_list, la_entry);
2048         }
2049
2050         return replmd_replicated_apply_next(ar);
2051 }
2052
2053 /*
2054   process one linked attribute structure
2055  */
2056 static int replmd_process_linked_attribute(struct ldb_module *module,
2057                                            struct la_entry *la_entry)
2058 {                                          
2059         struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
2060         struct ldb_context *ldb = ldb_module_get_ctx(module);
2061         struct drsuapi_DsReplicaObjectIdentifier3 target;
2062         struct ldb_message *msg;
2063         struct ldb_message_element *ret_el;
2064         TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
2065         enum ndr_err_code ndr_err;
2066         char *target_dn;
2067         struct ldb_request *mod_req;
2068         int ret;
2069         const struct dsdb_attribute *attr;
2070
2071 /*
2072 linked_attributes[0]:                                                     
2073      &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute 
2074         identifier               : *                                      
2075             identifier: struct drsuapi_DsReplicaObjectIdentifier          
2076                 __ndr_size               : 0x0000003a (58)                
2077                 __ndr_size_sid           : 0x00000000 (0)                 
2078                 guid                     : 8e95b6a9-13dd-4158-89db-3220a5be5cc7
2079                 sid                      : S-0-0                               
2080                 __ndr_size_dn            : 0x00000000 (0)                      
2081                 dn                       : ''                                  
2082         attid                    : DRSUAPI_ATTRIBUTE_member (0x1F)             
2083         value: struct drsuapi_DsAttributeValue                                 
2084             __ndr_size               : 0x0000007e (126)                        
2085             blob                     : *                                       
2086                 blob                     : DATA_BLOB length=126                
2087         flags                    : 0x00000001 (1)                              
2088                1: DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE                      
2089         originating_add_time     : Wed Sep  2 22:20:01 2009 EST                
2090         meta_data: struct drsuapi_DsReplicaMetaData                            
2091             version                  : 0x00000015 (21)                         
2092             originating_change_time  : Wed Sep  2 23:39:07 2009 EST            
2093             originating_invocation_id: 794640f3-18cf-40ee-a211-a93992b67a64    
2094             originating_usn          : 0x000000000001e19c (123292)             
2095      &target: struct drsuapi_DsReplicaObjectIdentifier3                        
2096         __ndr_size               : 0x0000007e (126)                            
2097         __ndr_size_sid           : 0x0000001c (28)                             
2098         guid                     : 7639e594-db75-4086-b0d4-67890ae46031        
2099         sid                      : S-1-5-21-2848215498-2472035911-1947525656-19924
2100         __ndr_size_dn            : 0x00000022 (34)                                
2101         dn                       : 'CN=UOne,OU=TestOU,DC=vsofs8,DC=com'           
2102  */
2103         if (DEBUGLVL(4)) {
2104                 NDR_PRINT_DEBUG(drsuapi_DsReplicaLinkedAttribute, la); 
2105         }
2106         
2107         /* decode the target of the link */
2108         ndr_err = ndr_pull_struct_blob(la->value.blob, 
2109                                        tmp_ctx, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
2110                                        &target,
2111                                        (ndr_pull_flags_fn_t)ndr_pull_drsuapi_DsReplicaObjectIdentifier3);
2112         if (ndr_err != NDR_ERR_SUCCESS) {
2113                 DEBUG(0,("Unable to decode linked_attribute target\n"));
2114                 dump_data(4, la->value.blob->data, la->value.blob->length);                     
2115                 talloc_free(tmp_ctx);
2116                 return LDB_ERR_OPERATIONS_ERROR;
2117         }
2118         if (DEBUGLVL(4)) {
2119                 NDR_PRINT_DEBUG(drsuapi_DsReplicaObjectIdentifier3, &target);
2120         }
2121
2122         /* construct a modify request for this attribute change */
2123         msg = ldb_msg_new(tmp_ctx);
2124         if (!msg) {
2125                 ldb_oom(ldb);
2126                 talloc_free(tmp_ctx);
2127                 return LDB_ERR_OPERATIONS_ERROR;
2128         }
2129
2130         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, 
2131                                    GUID_string(tmp_ctx, &la->identifier->guid), &msg->dn);
2132         if (ret != LDB_SUCCESS) {
2133                 talloc_free(tmp_ctx);
2134                 return ret;
2135         }
2136
2137         /* find the attribute being modified */
2138         attr = dsdb_attribute_by_attributeID_id(dsdb_get_schema(ldb), la->attid);
2139         if (attr == NULL) {
2140                 DEBUG(0, (__location__ ": Unable to find attributeID 0x%x\n", la->attid));
2141                 talloc_free(tmp_ctx);
2142                 return LDB_ERR_OPERATIONS_ERROR;
2143         }
2144
2145         if (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) {
2146                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
2147                                         LDB_FLAG_MOD_ADD, &ret_el);
2148         } else {
2149                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
2150                                         LDB_FLAG_MOD_DELETE, &ret_el);
2151         }
2152         if (ret != LDB_SUCCESS) {
2153                 talloc_free(tmp_ctx);
2154                 return ret;
2155         }
2156         /* we allocate two entries here, in case we need a remove/add
2157            pair */
2158         ret_el->values = talloc_array(msg, struct ldb_val, 2);
2159         if (!ret_el->values) {
2160                 ldb_oom(ldb);
2161                 talloc_free(tmp_ctx);
2162                 return LDB_ERR_OPERATIONS_ERROR;
2163         }
2164         ret_el->num_values = 1;
2165
2166         target_dn = talloc_asprintf(tmp_ctx, "<GUID=%s>;<SID=%s>;%s",
2167                                     GUID_string(tmp_ctx, &target.guid),
2168                                     dom_sid_string(tmp_ctx, &target.sid),
2169                                     target.dn);
2170         if (target_dn == NULL) {
2171                 ldb_oom(ldb);
2172                 talloc_free(tmp_ctx);
2173                 return LDB_ERR_OPERATIONS_ERROR;
2174         }
2175         ret_el->values[0] = data_blob_string_const(target_dn);
2176
2177         ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2178                                 msg,
2179                                 NULL,
2180                                 NULL, 
2181                                 ldb_op_default_callback,
2182                                 NULL);
2183         if (ret != LDB_SUCCESS) {
2184                 talloc_free(tmp_ctx);
2185                 return ret;
2186         }
2187         talloc_steal(mod_req, msg);
2188
2189         if (DEBUGLVL(4)) {
2190                 DEBUG(4,("Applying DRS linked attribute change:\n%s\n",
2191                          ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg)));
2192         }
2193
2194         /* Run the new request */
2195         ret = ldb_next_request(module, mod_req);
2196
2197         /* we need to wait for this to finish, as we are being called
2198            from the synchronous end_transaction hook of this module */
2199         if (ret == LDB_SUCCESS) {
2200                 ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
2201         }
2202
2203         if (ret == LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
2204                 /* the link destination exists, we need to update it
2205                  * by deleting the old one for the same DN then adding
2206                  * the new one */
2207                 msg->elements = talloc_realloc(msg, msg->elements,
2208                                                struct ldb_message_element,
2209                                                msg->num_elements+1);
2210                 if (msg->elements == NULL) {
2211                         ldb_oom(ldb);
2212                         talloc_free(tmp_ctx);
2213                         return LDB_ERR_OPERATIONS_ERROR;
2214                 }
2215                 /* this relies on the backend matching the old entry
2216                    only by the DN portion of the extended DN */
2217                 msg->elements[1] = msg->elements[0];
2218                 msg->elements[0].flags = LDB_FLAG_MOD_DELETE;
2219                 msg->num_elements++;
2220
2221                 ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2222                                         msg,
2223                                         NULL,
2224                                         NULL, 
2225                                         ldb_op_default_callback,
2226                                         NULL);
2227                 if (ret != LDB_SUCCESS) {
2228                         talloc_free(tmp_ctx);
2229                         return ret;
2230                 }
2231
2232                 /* Run the new request */
2233                 ret = ldb_next_request(module, mod_req);
2234                 
2235                 if (ret == LDB_SUCCESS) {
2236                         ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
2237                 }
2238         }
2239
2240         if (ret != LDB_SUCCESS) {
2241                 ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s' %s\n",
2242                           ldb_errstring(ldb),
2243                           ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
2244                 ret = LDB_SUCCESS;
2245         }
2246         
2247         talloc_free(tmp_ctx);
2248
2249         return ret;     
2250 }
2251
2252 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
2253 {
2254         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
2255                 return replmd_extended_replicated_objects(module, req);
2256         }
2257
2258         return ldb_next_request(module, req);
2259 }
2260
2261
2262 /*
2263   we hook into the transaction operations to allow us to 
2264   perform the linked attribute updates at the end of the whole
2265   transaction. This allows a forward linked attribute to be created
2266   before the object is created. During a vampire, w2k8 sends us linked
2267   attributes before the objects they are part of.
2268  */
2269 static int replmd_start_transaction(struct ldb_module *module)
2270 {
2271         /* create our private structure for this transaction */
2272         int i;
2273         struct replmd_private *replmd_private = talloc_get_type(ldb_module_get_private(module),
2274                                                                 struct replmd_private);
2275         talloc_free(replmd_private->la_ctx);
2276         replmd_private->la_list = NULL;
2277         replmd_private->la_ctx = NULL;
2278
2279         for (i=0; i<replmd_private->num_ncs; i++) {
2280                 replmd_private->ncs[i].mod_usn = 0;
2281         }
2282
2283         return ldb_next_start_trans(module);
2284 }
2285
2286 /*
2287   on prepare commit we loop over our queued la_context structures and
2288   apply each of them  
2289  */
2290 static int replmd_prepare_commit(struct ldb_module *module)
2291 {
2292         struct replmd_private *replmd_private = 
2293                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2294         struct la_entry *la, *prev;
2295         int ret;
2296
2297         /* walk the list backwards, to do the first entry first, as we
2298          * added the entries with DLIST_ADD() which puts them at the
2299          * start of the list */
2300         for (la = replmd_private->la_list; la && la->next; la=la->next) ;
2301
2302         for (; la; la=prev) {
2303                 prev = la->prev;
2304                 DLIST_REMOVE(replmd_private->la_list, la);
2305                 ret = replmd_process_linked_attribute(module, la);
2306                 if (ret != LDB_SUCCESS) {
2307                         return ret;
2308                 }
2309         }
2310
2311         talloc_free(replmd_private->la_ctx);
2312         replmd_private->la_list = NULL;
2313         replmd_private->la_ctx = NULL;
2314
2315         /* possibly change @REPLCHANGED */
2316         ret = replmd_notify_store(module);
2317         if (ret != LDB_SUCCESS) {
2318                 return ret;
2319         }
2320         
2321         return ldb_next_prepare_commit(module);
2322 }
2323
2324 static int replmd_del_transaction(struct ldb_module *module)
2325 {
2326         struct replmd_private *replmd_private = 
2327                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2328         talloc_free(replmd_private->la_ctx);
2329         replmd_private->la_list = NULL;
2330         replmd_private->la_ctx = NULL;
2331         return ldb_next_del_trans(module);
2332 }
2333
2334
2335 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
2336         .name          = "repl_meta_data",
2337         .init_context      = replmd_init,
2338         .add               = replmd_add,
2339         .modify            = replmd_modify,
2340         .rename            = replmd_rename,
2341         .extended          = replmd_extended,
2342         .start_transaction = replmd_start_transaction,
2343         .prepare_commit    = replmd_prepare_commit,
2344         .del_transaction   = replmd_del_transaction,
2345 };