86f77b12e87d55d5c217db3c7e8049d6c8f315ab
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /*
24  *  Name: ldb
25  *
26  *  Component: ldb repl_meta_data module
27  *
28  *  Description: - add a unique objectGUID onto every new record,
29  *               - handle whenCreated, whenChanged timestamps
30  *               - handle uSNCreated, uSNChanged numbers
31  *               - handle replPropertyMetaData attribute
32  *
33  *  Author: Simo Sorce
34  *  Author: Stefan Metzmacher
35  */
36
37 #include "includes.h"
38 #include "ldb_module.h"
39 #include "dsdb/samdb/samdb.h"
40 #include "dsdb/common/proto.h"
41 #include "../libds/common/flags.h"
42 #include "librpc/gen_ndr/ndr_misc.h"
43 #include "librpc/gen_ndr/ndr_drsuapi.h"
44 #include "librpc/gen_ndr/ndr_drsblobs.h"
45 #include "param/param.h"
46 #include "libcli/security/dom_sid.h"
47 #include "lib/util/dlinklist.h"
48
49 struct replmd_private {
50         struct la_entry *la_list;
51         uint32_t num_ncs;
52         struct nc_entry {
53                 struct ldb_dn *dn;
54                 struct GUID guid;
55                 uint64_t mod_usn;
56                 struct dsdb_control_current_partition *p_ctrl;
57         } *ncs;
58 };
59
60 struct la_entry {
61         struct la_entry *next, *prev;
62         struct drsuapi_DsReplicaLinkedAttribute *la;
63 };
64
65 struct replmd_replicated_request {
66         struct ldb_module *module;
67         struct ldb_request *req;
68
69         const struct dsdb_schema *schema;
70
71         struct dsdb_extended_replicated_objects *objs;
72
73         /* the controls we pass down */
74         struct ldb_control **controls;
75
76         uint32_t index_current;
77
78         struct ldb_message *search_msg;
79 };
80
81
82 /*
83   initialise the module
84   allocate the private structure and build the list
85   of partition DNs for use by replmd_notify()
86  */
87 static int replmd_init(struct ldb_module *module)
88 {
89         struct replmd_private *replmd_private;
90         struct ldb_context *ldb = ldb_module_get_ctx(module);
91
92         replmd_private = talloc_zero(module, struct replmd_private);
93         if (replmd_private == NULL) {
94                 ldb_oom(ldb);
95                 return LDB_ERR_OPERATIONS_ERROR;
96         }
97         ldb_module_set_private(module, replmd_private);
98
99         return ldb_next_init(module);
100 }
101
102
103 static int nc_compare(struct nc_entry *n1, struct nc_entry *n2)
104 {
105         return ldb_dn_compare(n1->dn, n2->dn);
106 }
107
108 /*
109   build the list of partition DNs for use by replmd_notify()
110  */
111 static int replmd_load_NCs(struct ldb_module *module)
112 {
113         const char *attrs[] = { "namingContexts", NULL };
114         struct ldb_result *res = NULL;
115         int i, ret;
116         TALLOC_CTX *tmp_ctx;
117         struct ldb_context *ldb;
118         struct ldb_message_element *el;
119         struct replmd_private *replmd_private = 
120                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
121
122         if (replmd_private->ncs != NULL) {
123                 return LDB_SUCCESS;
124         }
125
126         ldb = ldb_module_get_ctx(module);
127         tmp_ctx = talloc_new(module);
128
129         /* load the list of naming contexts */
130         ret = ldb_search(ldb, tmp_ctx, &res, ldb_dn_new(tmp_ctx, ldb, ""), 
131                          LDB_SCOPE_BASE, attrs, NULL);
132         if (ret != LDB_SUCCESS ||
133             res->count != 1) {
134                 DEBUG(0,(__location__ ": Failed to load rootDSE\n"));
135                 return LDB_ERR_OPERATIONS_ERROR;
136         }
137
138         el = ldb_msg_find_element(res->msgs[0], "namingContexts");
139         if (el == NULL) {
140                 DEBUG(0,(__location__ ": Failed to load namingContexts\n"));
141                 return LDB_ERR_OPERATIONS_ERROR;                
142         }
143
144         replmd_private->num_ncs = el->num_values;
145         replmd_private->ncs = talloc_array(replmd_private, struct nc_entry, 
146                                            replmd_private->num_ncs);
147         if (replmd_private->ncs == NULL) {
148                 ldb_oom(ldb);
149                 return LDB_ERR_OPERATIONS_ERROR;
150         }
151
152         for (i=0; i<replmd_private->num_ncs; i++) {
153                 replmd_private->ncs[i].dn = 
154                         ldb_dn_from_ldb_val(replmd_private->ncs, 
155                                             ldb, &el->values[i]);
156                 replmd_private->ncs[i].mod_usn = 0;
157         }
158
159         talloc_free(res);
160
161         /* now find the GUIDs of each of those DNs */
162         for (i=0; i<replmd_private->num_ncs; i++) {
163                 const char *attrs2[] = { "objectGUID", NULL };
164                 ret = ldb_search(ldb, tmp_ctx, &res, replmd_private->ncs[i].dn,
165                                  LDB_SCOPE_BASE, attrs2, NULL);
166                 if (ret != LDB_SUCCESS ||
167                     res->count != 1) {
168                         DEBUG(0,(__location__ ": Failed to load GUID for %s\n",
169                                  ldb_dn_get_linearized(replmd_private->ncs[i].dn)));
170                         return LDB_ERR_OPERATIONS_ERROR;
171                 }
172                 replmd_private->ncs[i].guid = 
173                         samdb_result_guid(res->msgs[0], "objectGUID");
174                 talloc_free(res);
175         }       
176
177         /* sort the NCs into order, most to least specific */
178         qsort(replmd_private->ncs, replmd_private->num_ncs,
179               sizeof(replmd_private->ncs[0]), QSORT_CAST nc_compare);
180
181         
182         /* pre-create the partition control used in
183            replmd_notify_store() */
184         for (i=0; i<replmd_private->num_ncs; i++) {
185                 replmd_private->ncs[i].p_ctrl = talloc(replmd_private->ncs,
186                                                        struct dsdb_control_current_partition);
187                 if (replmd_private->ncs[i].p_ctrl == NULL) {
188                         ldb_oom(ldb);
189                         return LDB_ERR_OPERATIONS_ERROR;
190                 }
191                 replmd_private->ncs[i].p_ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
192                 replmd_private->ncs[i].p_ctrl->dn = replmd_private->ncs[i].dn;
193         }
194       
195         talloc_free(tmp_ctx);
196         
197         return LDB_SUCCESS;
198 }
199
200
201 /*
202  * notify the repl task that a object has changed. The notifies are
203  * gathered up in the replmd_private structure then written to the
204  * @REPLCHANGED object in each partition during the prepare_commit
205  */
206 static int replmd_notify(struct ldb_module *module, struct ldb_dn *dn, uint64_t uSN)
207 {
208         int ret, i;
209         struct replmd_private *replmd_private = 
210                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
211
212         ret = replmd_load_NCs(module);
213         if (ret != LDB_SUCCESS) {
214                 return ret;
215         }
216
217         for (i=0; i<replmd_private->num_ncs; i++) {
218                 if (ldb_dn_compare_base(replmd_private->ncs[i].dn, dn) == 0) {
219                         break;
220                 }
221         }
222         if (i == replmd_private->num_ncs) {
223                 DEBUG(0,(__location__ ": DN not within known NCs '%s'\n", 
224                          ldb_dn_get_linearized(dn)));
225                 return LDB_ERR_OPERATIONS_ERROR;
226         }
227
228         if (uSN > replmd_private->ncs[i].mod_usn) {
229             replmd_private->ncs[i].mod_usn = uSN;
230         }
231
232         return LDB_SUCCESS;
233 }
234
235
236 /*
237  * update a @REPLCHANGED record in each partition if there have been
238  * any writes of replicated data in the partition
239  */
240 static int replmd_notify_store(struct ldb_module *module)
241 {
242         int ret, i;
243         struct replmd_private *replmd_private = 
244                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
245         struct ldb_context *ldb = ldb_module_get_ctx(module);
246
247         for (i=0; i<replmd_private->num_ncs; i++) {
248                 struct ldb_message *msg;
249                 struct ldb_request *req;
250
251                 if (replmd_private->ncs[i].mod_usn == 0) {
252                         /* this partition has not changed in this
253                            transaction */
254                         continue;
255                 }
256
257                 msg = ldb_msg_new(module);
258                 if (msg == NULL) {
259                         ldb_oom(ldb);
260                         return LDB_ERR_OPERATIONS_ERROR;
261                 }
262
263                 msg->dn = ldb_dn_new(msg, ldb, "@REPLCHANGED");
264                 if (msg->dn == NULL) {
265                         ldb_oom(ldb);
266                         talloc_free(msg);
267                         return LDB_ERR_OPERATIONS_ERROR;
268                 }
269
270                 ret = ldb_msg_add_fmt(msg, "uSNHighest", "%llu", 
271                                       (unsigned long long)replmd_private->ncs[i].mod_usn);
272                 if (ret != LDB_SUCCESS) {
273                         talloc_free(msg);
274                         return ret;
275                 }
276                 msg->elements[0].flags = LDB_FLAG_MOD_REPLACE;
277
278                 ret = ldb_build_mod_req(&req, ldb, msg,
279                                         msg,
280                                         NULL,
281                                         NULL, ldb_op_default_callback,
282                                         NULL);
283 again:
284                 if (ret != LDB_SUCCESS) {
285                         talloc_free(msg);
286                         return ret;
287                 }
288
289                 ret = ldb_request_add_control(req,
290                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
291                                               false, replmd_private->ncs[i].p_ctrl);
292                 if (ret != LDB_SUCCESS) {
293                         talloc_free(msg);
294                         return ret;
295                 }
296
297
298                 /* Run the new request */
299                 ret = ldb_next_request(module, req);
300
301                 if (ret == LDB_SUCCESS) {
302                         ret = ldb_wait(req->handle, LDB_WAIT_ALL);
303                 }
304                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
305                         ret = ldb_build_add_req(&req, ldb, msg,
306                                                 msg,
307                                                 NULL,
308                                                 NULL, ldb_op_default_callback,
309                                                 NULL);
310                         goto again;
311                 }
312
313                 talloc_free(msg);
314
315                 if (ret != LDB_SUCCESS) {
316                         return ret;
317                 }
318         }
319
320         return LDB_SUCCESS;
321 }
322
323
324 /*
325   created a replmd_replicated_request context
326  */
327 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
328                                                          struct ldb_request *req)
329 {
330         struct ldb_context *ldb;
331         struct replmd_replicated_request *ac;
332
333         ldb = ldb_module_get_ctx(module);
334
335         ac = talloc_zero(req, struct replmd_replicated_request);
336         if (ac == NULL) {
337                 ldb_oom(ldb);
338                 return NULL;
339         }
340
341         ac->module = module;
342         ac->req = req;
343         return ac;
344 }
345
346 /*
347   add a time element to a record
348 */
349 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
350 {
351         struct ldb_message_element *el;
352         char *s;
353
354         if (ldb_msg_find_element(msg, attr) != NULL) {
355                 return LDB_SUCCESS;
356         }
357
358         s = ldb_timestring(msg, t);
359         if (s == NULL) {
360                 return LDB_ERR_OPERATIONS_ERROR;
361         }
362
363         if (ldb_msg_add_string(msg, attr, s) != LDB_SUCCESS) {
364                 return LDB_ERR_OPERATIONS_ERROR;
365         }
366
367         el = ldb_msg_find_element(msg, attr);
368         /* always set as replace. This works because on add ops, the flag
369            is ignored */
370         el->flags = LDB_FLAG_MOD_REPLACE;
371
372         return LDB_SUCCESS;
373 }
374
375 /*
376   add a uint64_t element to a record
377 */
378 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
379 {
380         struct ldb_message_element *el;
381
382         if (ldb_msg_find_element(msg, attr) != NULL) {
383                 return LDB_SUCCESS;
384         }
385
386         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != LDB_SUCCESS) {
387                 return LDB_ERR_OPERATIONS_ERROR;
388         }
389
390         el = ldb_msg_find_element(msg, attr);
391         /* always set as replace. This works because on add ops, the flag
392            is ignored */
393         el->flags = LDB_FLAG_MOD_REPLACE;
394
395         return LDB_SUCCESS;
396 }
397
398 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
399                                                    const struct replPropertyMetaData1 *m2,
400                                                    const uint32_t *rdn_attid)
401 {
402         if (m1->attid == m2->attid) {
403                 return 0;
404         }
405
406         /*
407          * the rdn attribute should be at the end!
408          * so we need to return a value greater than zero
409          * which means m1 is greater than m2
410          */
411         if (m1->attid == *rdn_attid) {
412                 return 1;
413         }
414
415         /*
416          * the rdn attribute should be at the end!
417          * so we need to return a value less than zero
418          * which means m2 is greater than m1
419          */
420         if (m2->attid == *rdn_attid) {
421                 return -1;
422         }
423
424         return m1->attid - m2->attid;
425 }
426
427 static void replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
428                                                  const uint32_t *rdn_attid)
429 {
430         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
431                   discard_const_p(void, rdn_attid), (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
432 }
433
434 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
435                                                  const struct ldb_message_element *e2,
436                                                  const struct dsdb_schema *schema)
437 {
438         const struct dsdb_attribute *a1;
439         const struct dsdb_attribute *a2;
440
441         /* 
442          * TODO: make this faster by caching the dsdb_attribute pointer
443          *       on the ldb_messag_element
444          */
445
446         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
447         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
448
449         /*
450          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
451          *       in the schema
452          */
453         if (!a1 || !a2) {
454                 return strcasecmp(e1->name, e2->name);
455         }
456
457         return a1->attributeID_id - a2->attributeID_id;
458 }
459
460 static void replmd_ldb_message_sort(struct ldb_message *msg,
461                                     const struct dsdb_schema *schema)
462 {
463         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
464                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
465 }
466
467 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
468 {
469         struct ldb_context *ldb;
470         struct replmd_replicated_request *ac;
471
472         ac = talloc_get_type(req->context, struct replmd_replicated_request);
473         ldb = ldb_module_get_ctx(ac->module);
474
475         if (!ares) {
476                 return ldb_module_done(ac->req, NULL, NULL,
477                                         LDB_ERR_OPERATIONS_ERROR);
478         }
479         if (ares->error != LDB_SUCCESS) {
480                 return ldb_module_done(ac->req, ares->controls,
481                                         ares->response, ares->error);
482         }
483
484         if (ares->type != LDB_REPLY_DONE) {
485                 ldb_set_errstring(ldb,
486                                   "invalid ldb_reply_type in callback");
487                 talloc_free(ares);
488                 return ldb_module_done(ac->req, NULL, NULL,
489                                         LDB_ERR_OPERATIONS_ERROR);
490         }
491
492         return ldb_module_done(ac->req, ares->controls,
493                                 ares->response, LDB_SUCCESS);
494 }
495
496 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
497 {
498         struct ldb_context *ldb;
499         struct replmd_replicated_request *ac;
500         const struct dsdb_schema *schema;
501         enum ndr_err_code ndr_err;
502         struct ldb_request *down_req;
503         struct ldb_message *msg;
504         const struct dsdb_attribute *rdn_attr = NULL;
505         struct GUID guid;
506         struct ldb_val guid_value;
507         struct replPropertyMetaDataBlob nmd;
508         struct ldb_val nmd_value;
509         uint64_t seq_num;
510         const struct GUID *our_invocation_id;
511         time_t t = time(NULL);
512         NTTIME now;
513         char *time_str;
514         int ret;
515         uint32_t i, ni=0;
516
517         /* do not manipulate our control entries */
518         if (ldb_dn_is_special(req->op.add.message->dn)) {
519                 return ldb_next_request(module, req);
520         }
521
522         ldb = ldb_module_get_ctx(module);
523
524         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
525
526         schema = dsdb_get_schema(ldb);
527         if (!schema) {
528                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
529                               "replmd_add: no dsdb_schema loaded");
530                 return LDB_ERR_CONSTRAINT_VIOLATION;
531         }
532
533         ac = replmd_ctx_init(module, req);
534         if (!ac) {
535                 return LDB_ERR_OPERATIONS_ERROR;
536         }
537
538         ac->schema = schema;
539
540         if (ldb_msg_find_element(req->op.add.message, "objectGUID") != NULL) {
541                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
542                               "replmd_add: it's not allowed to add an object with objectGUID\n");
543                 return LDB_ERR_UNWILLING_TO_PERFORM;
544         }
545
546         /* Get a sequence number from the backend */
547         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
548         if (ret != LDB_SUCCESS) {
549                 return ret;
550         }
551
552         /* a new GUID */
553         guid = GUID_random();
554
555         /* get our invocationId */
556         our_invocation_id = samdb_ntds_invocation_id(ldb);
557         if (!our_invocation_id) {
558                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
559                               "replmd_add: unable to find invocationId\n");
560                 return LDB_ERR_OPERATIONS_ERROR;
561         }
562
563         /* we have to copy the message as the caller might have it as a const */
564         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
565         if (msg == NULL) {
566                 ldb_oom(ldb);
567                 return LDB_ERR_OPERATIONS_ERROR;
568         }
569
570         /* generated times */
571         unix_to_nt_time(&now, t);
572         time_str = ldb_timestring(msg, t);
573         if (!time_str) {
574                 return LDB_ERR_OPERATIONS_ERROR;
575         }
576
577         /* 
578          * remove autogenerated attributes
579          */
580         ldb_msg_remove_attr(msg, "whenCreated");
581         ldb_msg_remove_attr(msg, "whenChanged");
582         ldb_msg_remove_attr(msg, "uSNCreated");
583         ldb_msg_remove_attr(msg, "uSNChanged");
584         ldb_msg_remove_attr(msg, "replPropertyMetaData");
585
586         /*
587          * readd replicated attributes
588          */
589         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
590         if (ret != LDB_SUCCESS) {
591                 ldb_oom(ldb);
592                 return LDB_ERR_OPERATIONS_ERROR;
593         }
594
595         /* build the replication meta_data */
596         ZERO_STRUCT(nmd);
597         nmd.version             = 1;
598         nmd.ctr.ctr1.count      = msg->num_elements;
599         nmd.ctr.ctr1.array      = talloc_array(msg,
600                                                struct replPropertyMetaData1,
601                                                nmd.ctr.ctr1.count);
602         if (!nmd.ctr.ctr1.array) {
603                 ldb_oom(ldb);
604                 return LDB_ERR_OPERATIONS_ERROR;
605         }
606
607         for (i=0; i < msg->num_elements; i++) {
608                 struct ldb_message_element *e = &msg->elements[i];
609                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
610                 const struct dsdb_attribute *sa;
611
612                 if (e->name[0] == '@') continue;
613
614                 sa = dsdb_attribute_by_lDAPDisplayName(schema, e->name);
615                 if (!sa) {
616                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
617                                       "replmd_add: attribute '%s' not defined in schema\n",
618                                       e->name);
619                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
620                 }
621
622                 if ((sa->systemFlags & 0x00000001) || (sa->systemFlags & 0x00000004)) {
623                         /* if the attribute is not replicated (0x00000001)
624                          * or constructed (0x00000004) it has no metadata
625                          */
626                         continue;
627                 }
628
629                 m->attid                        = sa->attributeID_id;
630                 m->version                      = 1;
631                 m->originating_change_time      = now;
632                 m->originating_invocation_id    = *our_invocation_id;
633                 m->originating_usn              = seq_num;
634                 m->local_usn                    = seq_num;
635                 ni++;
636
637                 if (ldb_attr_cmp(e->name, ldb_dn_get_rdn_name(msg->dn))) {
638                         rdn_attr = sa;
639                 }
640         }
641
642         /* fix meta data count */
643         nmd.ctr.ctr1.count = ni;
644
645         /*
646          * sort meta data array, and move the rdn attribute entry to the end
647          */
648         replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_attr->attributeID_id);
649
650         /* generated NDR encoded values */
651         ndr_err = ndr_push_struct_blob(&guid_value, msg, 
652                                        NULL,
653                                        &guid,
654                                        (ndr_push_flags_fn_t)ndr_push_GUID);
655         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
656                 ldb_oom(ldb);
657                 return LDB_ERR_OPERATIONS_ERROR;
658         }
659         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
660                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
661                                        &nmd,
662                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
663         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
664                 ldb_oom(ldb);
665                 return LDB_ERR_OPERATIONS_ERROR;
666         }
667
668         /*
669          * add the autogenerated values
670          */
671         ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
672         if (ret != LDB_SUCCESS) {
673                 ldb_oom(ldb);
674                 return LDB_ERR_OPERATIONS_ERROR;
675         }
676         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
677         if (ret != LDB_SUCCESS) {
678                 ldb_oom(ldb);
679                 return LDB_ERR_OPERATIONS_ERROR;
680         }
681         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
682         if (ret != LDB_SUCCESS) {
683                 ldb_oom(ldb);
684                 return LDB_ERR_OPERATIONS_ERROR;
685         }
686         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
687         if (ret != LDB_SUCCESS) {
688                 ldb_oom(ldb);
689                 return LDB_ERR_OPERATIONS_ERROR;
690         }
691         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
692         if (ret != LDB_SUCCESS) {
693                 ldb_oom(ldb);
694                 return LDB_ERR_OPERATIONS_ERROR;
695         }
696
697         /*
698          * sort the attributes by attid before storing the object
699          */
700         replmd_ldb_message_sort(msg, schema);
701
702         ret = ldb_build_add_req(&down_req, ldb, ac,
703                                 msg,
704                                 req->controls,
705                                 ac, replmd_op_callback,
706                                 req);
707         if (ret != LDB_SUCCESS) {
708                 return ret;
709         }
710
711         ret = replmd_notify(module, msg->dn, seq_num);
712         if (ret != LDB_SUCCESS) {
713                 return ret;
714         }
715
716         /* go on with the call chain */
717         return ldb_next_request(module, down_req);
718 }
719
720
721 /*
722  * update the replPropertyMetaData for one element
723  */
724 static int replmd_update_rpmd_element(struct ldb_context *ldb, 
725                                       struct ldb_message *msg,
726                                       struct ldb_message_element *el,
727                                       struct replPropertyMetaDataBlob *omd,
728                                       struct dsdb_schema *schema,
729                                       uint64_t *seq_num,
730                                       const struct GUID *our_invocation_id,
731                                       NTTIME now)
732 {
733         int i;
734         const struct dsdb_attribute *a;
735         struct replPropertyMetaData1 *md1;
736
737         a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
738         if (a == NULL) {
739                 DEBUG(0,(__location__ ": Unable to find attribute %s in schema\n",
740                          el->name));
741                 return LDB_ERR_OPERATIONS_ERROR;
742         }
743
744         if ((a->systemFlags & 0x00000001) || (a->systemFlags & 0x00000004)) {
745                 /* if the attribute is not replicated (0x00000001)
746                  * or constructed (0x00000004) it has no metadata
747                  */
748                 return LDB_SUCCESS;
749         }
750
751         for (i=0; i<omd->ctr.ctr1.count; i++) {
752                 if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) break;
753         }
754         if (i == omd->ctr.ctr1.count) {
755                 /* we need to add a new one */
756                 omd->ctr.ctr1.array = talloc_realloc(msg, omd->ctr.ctr1.array, 
757                                                      struct replPropertyMetaData1, omd->ctr.ctr1.count+1);
758                 if (omd->ctr.ctr1.array == NULL) {
759                         ldb_oom(ldb);
760                         return LDB_ERR_OPERATIONS_ERROR;
761                 }
762                 omd->ctr.ctr1.count++;
763                 ZERO_STRUCT(omd->ctr.ctr1.array[i]);
764         }
765
766         /* Get a new sequence number from the backend. We only do this
767          * if we have a change that requires a new
768          * replPropertyMetaData element 
769          */
770         if (*seq_num == 0) {
771                 int ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num);
772                 if (ret != LDB_SUCCESS) {
773                         return LDB_ERR_OPERATIONS_ERROR;
774                 }
775         }
776
777         md1 = &omd->ctr.ctr1.array[i];
778         md1->version++;
779         md1->attid                     = a->attributeID_id;
780         md1->originating_change_time   = now;
781         md1->originating_invocation_id = *our_invocation_id;
782         md1->originating_usn           = *seq_num;
783         md1->local_usn                 = *seq_num;
784         
785         return LDB_SUCCESS;
786 }
787
788 /*
789  * update the replPropertyMetaData object each time we modify an
790  * object. This is needed for DRS replication, as the merge on the
791  * client is based on this object 
792  */
793 static int replmd_update_rpmd(struct ldb_module *module, 
794                               struct ldb_message *msg, uint64_t *seq_num)
795 {
796         const struct ldb_val *omd_value;
797         enum ndr_err_code ndr_err;
798         struct replPropertyMetaDataBlob omd;
799         int i;
800         struct dsdb_schema *schema;
801         time_t t = time(NULL);
802         NTTIME now;
803         const struct GUID *our_invocation_id;
804         int ret;
805         const char *attrs[] = { "replPropertyMetaData" , NULL };
806         struct ldb_result *res;
807         struct ldb_context *ldb;
808
809         ldb = ldb_module_get_ctx(module);
810
811         our_invocation_id = samdb_ntds_invocation_id(ldb);
812         if (!our_invocation_id) {
813                 /* this happens during an initial vampire while
814                    updating the schema */
815                 DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
816                 return LDB_SUCCESS;
817         }
818
819         unix_to_nt_time(&now, t);
820
821         /* search for the existing replPropertyMetaDataBlob */
822         ret = ldb_search(ldb, msg, &res, msg->dn, LDB_SCOPE_BASE, attrs, NULL);
823         if (ret != LDB_SUCCESS || res->count < 1) {
824                 DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
825                          ldb_dn_get_linearized(msg->dn)));
826                 return LDB_ERR_OPERATIONS_ERROR;
827         }
828                 
829
830         omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
831         if (!omd_value) {
832                 DEBUG(0,(__location__ ": Object %s does not have a replPropertyMetaData attribute\n",
833                          ldb_dn_get_linearized(msg->dn)));
834                 return LDB_ERR_OPERATIONS_ERROR;
835         }
836
837         ndr_err = ndr_pull_struct_blob(omd_value, msg,
838                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
839                                        (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
840         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
841                 DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
842                          ldb_dn_get_linearized(msg->dn)));
843                 return LDB_ERR_OPERATIONS_ERROR;
844         }
845
846         if (omd.version != 1) {
847                 DEBUG(0,(__location__ ": bad version %u in replPropertyMetaData for %s\n",
848                          omd.version, ldb_dn_get_linearized(msg->dn)));
849                 return LDB_ERR_OPERATIONS_ERROR;
850         }
851
852         schema = dsdb_get_schema(ldb);
853
854         for (i=0; i<msg->num_elements; i++) {
855                 ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], &omd, schema, seq_num, 
856                                                  our_invocation_id, now);
857                 if (ret != LDB_SUCCESS) {
858                         return ret;
859                 }
860         }
861
862         /*
863          * replmd_update_rpmd_element has done an update if the
864          * seq_num is set
865          */
866         if (*seq_num != 0) {
867                 struct ldb_val *md_value;
868                 struct ldb_message_element *el;
869
870                 md_value = talloc(msg, struct ldb_val);
871                 if (md_value == NULL) {
872                         ldb_oom(ldb);
873                         return LDB_ERR_OPERATIONS_ERROR;
874                 }
875
876                 ndr_err = ndr_push_struct_blob(md_value, msg, 
877                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
878                                                &omd,
879                                                (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
880                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
881                         DEBUG(0,(__location__ ": Failed to marshall replPropertyMetaData for %s\n",
882                                  ldb_dn_get_linearized(msg->dn)));
883                         return LDB_ERR_OPERATIONS_ERROR;
884                 }
885
886                 ret = ldb_msg_add_empty(msg, "replPropertyMetaData", LDB_FLAG_MOD_REPLACE, &el);
887                 if (ret != LDB_SUCCESS) {
888                         DEBUG(0,(__location__ ": Failed to add updated replPropertyMetaData %s\n",
889                                  ldb_dn_get_linearized(msg->dn)));
890                         return ret;
891                 }
892
893                 ret = replmd_notify(module, msg->dn, *seq_num);
894                 if (ret != LDB_SUCCESS) {
895                         return ret;
896                 }
897
898                 el->num_values = 1;
899                 el->values = md_value;
900         }
901
902         return LDB_SUCCESS;     
903 }
904
905
906 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
907 {
908         struct ldb_context *ldb;
909         struct replmd_replicated_request *ac;
910         const struct dsdb_schema *schema;
911         struct ldb_request *down_req;
912         struct ldb_message *msg;
913         int ret;
914         time_t t = time(NULL);
915         uint64_t seq_num = 0;
916
917         /* do not manipulate our control entries */
918         if (ldb_dn_is_special(req->op.mod.message->dn)) {
919                 return ldb_next_request(module, req);
920         }
921
922         ldb = ldb_module_get_ctx(module);
923
924         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
925
926         schema = dsdb_get_schema(ldb);
927         if (!schema) {
928                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
929                               "replmd_modify: no dsdb_schema loaded");
930                 return LDB_ERR_CONSTRAINT_VIOLATION;
931         }
932
933         ac = replmd_ctx_init(module, req);
934         if (!ac) {
935                 return LDB_ERR_OPERATIONS_ERROR;
936         }
937
938         ac->schema = schema;
939
940         /* we have to copy the message as the caller might have it as a const */
941         msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
942         if (msg == NULL) {
943                 talloc_free(ac);
944                 return LDB_ERR_OPERATIONS_ERROR;
945         }
946
947         /* TODO:
948          * - get the whole old object
949          * - if the old object doesn't exist report an error
950          * - give an error when a readonly attribute should
951          *   be modified
952          * - merge the changed into the old object
953          *   if the caller set values to the same value
954          *   ignore the attribute, return success when no
955          *   attribute was changed
956          */
957
958         ret = replmd_update_rpmd(module, msg, &seq_num);
959         if (ret != LDB_SUCCESS) {
960                 return ret;
961         }
962
963         /* TODO:
964          * - sort the attributes by attid with replmd_ldb_message_sort()
965          * - replace the old object with the newly constructed one
966          */
967
968         ret = ldb_build_mod_req(&down_req, ldb, ac,
969                                 msg,
970                                 req->controls,
971                                 ac, replmd_op_callback,
972                                 req);
973         if (ret != LDB_SUCCESS) {
974                 return ret;
975         }
976         talloc_steal(down_req, msg);
977
978         /* we only change whenChanged and uSNChanged if the seq_num
979            has changed */
980         if (seq_num != 0) {
981                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
982                         talloc_free(ac);
983                         return LDB_ERR_OPERATIONS_ERROR;
984                 }
985
986                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
987                         talloc_free(ac);
988                         return LDB_ERR_OPERATIONS_ERROR;
989                 }
990         }
991
992         /* go on with the call chain */
993         return ldb_next_request(module, down_req);
994 }
995
996 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
997 {
998         return ret;
999 }
1000
1001 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
1002 {
1003         int ret = LDB_ERR_OTHER;
1004         /* TODO: do some error mapping */
1005         return ret;
1006 }
1007
1008 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
1009
1010 static int replmd_replicated_apply_add_callback(struct ldb_request *req,
1011                                                 struct ldb_reply *ares)
1012 {
1013         struct ldb_context *ldb;
1014         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1015                                                struct replmd_replicated_request);
1016         int ret;
1017
1018         ldb = ldb_module_get_ctx(ar->module);
1019
1020         if (!ares) {
1021                 return ldb_module_done(ar->req, NULL, NULL,
1022                                         LDB_ERR_OPERATIONS_ERROR);
1023         }
1024         if (ares->error != LDB_SUCCESS) {
1025                 return ldb_module_done(ar->req, ares->controls,
1026                                         ares->response, ares->error);
1027         }
1028
1029         if (ares->type != LDB_REPLY_DONE) {
1030                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1031                 return ldb_module_done(ar->req, NULL, NULL,
1032                                         LDB_ERR_OPERATIONS_ERROR);
1033         }
1034
1035         talloc_free(ares);
1036         ar->index_current++;
1037
1038         ret = replmd_replicated_apply_next(ar);
1039         if (ret != LDB_SUCCESS) {
1040                 return ldb_module_done(ar->req, NULL, NULL, ret);
1041         }
1042
1043         return LDB_SUCCESS;
1044 }
1045
1046 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
1047 {
1048         struct ldb_context *ldb;
1049         struct ldb_request *change_req;
1050         enum ndr_err_code ndr_err;
1051         struct ldb_message *msg;
1052         struct replPropertyMetaDataBlob *md;
1053         struct ldb_val md_value;
1054         uint32_t i;
1055         uint64_t seq_num;
1056         int ret;
1057
1058         /*
1059          * TODO: check if the parent object exist
1060          */
1061
1062         /*
1063          * TODO: handle the conflict case where an object with the
1064          *       same name exist
1065          */
1066
1067         ldb = ldb_module_get_ctx(ar->module);
1068         msg = ar->objs->objects[ar->index_current].msg;
1069         md = ar->objs->objects[ar->index_current].meta_data;
1070
1071         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1072         if (ret != LDB_SUCCESS) {
1073                 return replmd_replicated_request_error(ar, ret);
1074         }
1075
1076         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
1077         if (ret != LDB_SUCCESS) {
1078                 return replmd_replicated_request_error(ar, ret);
1079         }
1080
1081         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1082         if (ret != LDB_SUCCESS) {
1083                 return replmd_replicated_request_error(ar, ret);
1084         }
1085
1086         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
1087         if (ret != LDB_SUCCESS) {
1088                 return replmd_replicated_request_error(ar, ret);
1089         }
1090
1091         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1092         if (ret != LDB_SUCCESS) {
1093                 return replmd_replicated_request_error(ar, ret);
1094         }
1095
1096         ret = replmd_notify(ar->module, msg->dn, seq_num);
1097         if (ret != LDB_SUCCESS) {
1098                 return replmd_replicated_request_error(ar, ret);
1099         }
1100
1101         /*
1102          * the meta data array is already sorted by the caller
1103          */
1104         for (i=0; i < md->ctr.ctr1.count; i++) {
1105                 md->ctr.ctr1.array[i].local_usn = seq_num;
1106         }
1107         ndr_err = ndr_push_struct_blob(&md_value, msg, 
1108                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1109                                        md,
1110                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1111         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1112                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1113                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1114         }
1115         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
1116         if (ret != LDB_SUCCESS) {
1117                 return replmd_replicated_request_error(ar, ret);
1118         }
1119
1120         replmd_ldb_message_sort(msg, ar->schema);
1121
1122         if (DEBUGLVL(4)) {
1123                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
1124                 DEBUG(4, ("DRS replication add message:\n%s\n", s));
1125                 talloc_free(s);
1126         }
1127
1128         ret = ldb_build_add_req(&change_req,
1129                                 ldb,
1130                                 ar,
1131                                 msg,
1132                                 ar->controls,
1133                                 ar,
1134                                 replmd_replicated_apply_add_callback,
1135                                 ar->req);
1136         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1137
1138         return ldb_next_request(ar->module, change_req);
1139 }
1140
1141 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
1142                                                          struct replPropertyMetaData1 *m2)
1143 {
1144         int ret;
1145
1146         if (m1->version != m2->version) {
1147                 return m1->version - m2->version;
1148         }
1149
1150         if (m1->originating_change_time != m2->originating_change_time) {
1151                 return m1->originating_change_time - m2->originating_change_time;
1152         }
1153
1154         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
1155         if (ret != 0) {
1156                 return ret;
1157         }
1158
1159         return m1->originating_usn - m2->originating_usn;
1160 }
1161
1162 static int replmd_replicated_apply_merge_callback(struct ldb_request *req,
1163                                                   struct ldb_reply *ares)
1164 {
1165         struct ldb_context *ldb;
1166         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1167                                                struct replmd_replicated_request);
1168         int ret;
1169
1170         ldb = ldb_module_get_ctx(ar->module);
1171
1172         if (!ares) {
1173                 return ldb_module_done(ar->req, NULL, NULL,
1174                                         LDB_ERR_OPERATIONS_ERROR);
1175         }
1176         if (ares->error != LDB_SUCCESS) {
1177                 return ldb_module_done(ar->req, ares->controls,
1178                                         ares->response, ares->error);
1179         }
1180
1181         if (ares->type != LDB_REPLY_DONE) {
1182                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1183                 return ldb_module_done(ar->req, NULL, NULL,
1184                                         LDB_ERR_OPERATIONS_ERROR);
1185         }
1186
1187         talloc_free(ares);
1188         ar->index_current++;
1189
1190         ret = replmd_replicated_apply_next(ar);
1191         if (ret != LDB_SUCCESS) {
1192                 return ldb_module_done(ar->req, NULL, NULL, ret);
1193         }
1194
1195         return LDB_SUCCESS;
1196 }
1197
1198 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
1199 {
1200         struct ldb_context *ldb;
1201         struct ldb_request *change_req;
1202         enum ndr_err_code ndr_err;
1203         struct ldb_message *msg;
1204         struct replPropertyMetaDataBlob *rmd;
1205         struct replPropertyMetaDataBlob omd;
1206         const struct ldb_val *omd_value;
1207         struct replPropertyMetaDataBlob nmd;
1208         struct ldb_val nmd_value;
1209         uint32_t i,j,ni=0;
1210         uint32_t removed_attrs = 0;
1211         uint64_t seq_num;
1212         int ret;
1213
1214         ldb = ldb_module_get_ctx(ar->module);
1215         msg = ar->objs->objects[ar->index_current].msg;
1216         rmd = ar->objs->objects[ar->index_current].meta_data;
1217         ZERO_STRUCT(omd);
1218         omd.version = 1;
1219
1220         /*
1221          * TODO: check repl data is correct after a rename
1222          */
1223         if (ldb_dn_compare(msg->dn, ar->search_msg->dn) != 0) {
1224                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_request rename %s => %s\n",
1225                           ldb_dn_get_linearized(ar->search_msg->dn),
1226                           ldb_dn_get_linearized(msg->dn));
1227                 if (ldb_rename(ldb, ar->search_msg->dn, msg->dn) != LDB_SUCCESS) {
1228                         ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_replicated_request rename %s => %s failed - %s\n",
1229                                   ldb_dn_get_linearized(ar->search_msg->dn),
1230                                   ldb_dn_get_linearized(msg->dn),
1231                                   ldb_errstring(ldb));
1232                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
1233                 }
1234         }
1235
1236         /* find existing meta data */
1237         omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
1238         if (omd_value) {
1239                 ndr_err = ndr_pull_struct_blob(omd_value, ar,
1240                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
1241                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
1242                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1243                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1244                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1245                 }
1246
1247                 if (omd.version != 1) {
1248                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1249                 }
1250         }
1251
1252         ZERO_STRUCT(nmd);
1253         nmd.version = 1;
1254         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
1255         nmd.ctr.ctr1.array = talloc_array(ar,
1256                                           struct replPropertyMetaData1,
1257                                           nmd.ctr.ctr1.count);
1258         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1259
1260         /* first copy the old meta data */
1261         for (i=0; i < omd.ctr.ctr1.count; i++) {
1262                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
1263                 ni++;
1264         }
1265
1266         /* now merge in the new meta data */
1267         for (i=0; i < rmd->ctr.ctr1.count; i++) {
1268                 bool found = false;
1269
1270                 for (j=0; j < ni; j++) {
1271                         int cmp;
1272
1273                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
1274                                 continue;
1275                         }
1276
1277                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
1278                                                                             &nmd.ctr.ctr1.array[j]);
1279                         if (cmp > 0) {
1280                                 /* replace the entry */
1281                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
1282                                 found = true;
1283                                 break;
1284                         }
1285
1286                         /* we don't want to apply this change so remove the attribute */
1287                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
1288                         removed_attrs++;
1289
1290                         found = true;
1291                         break;
1292                 }
1293
1294                 if (found) continue;
1295
1296                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
1297                 ni++;
1298         }
1299
1300         /*
1301          * finally correct the size of the meta_data array
1302          */
1303         nmd.ctr.ctr1.count = ni;
1304
1305         /*
1306          * the rdn attribute (the alias for the name attribute),
1307          * 'cn' for most objects is the last entry in the meta data array
1308          * we have stored
1309          *
1310          * sort the new meta data array
1311          */
1312         {
1313                 struct replPropertyMetaData1 *rdn_p;
1314                 uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
1315
1316                 rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
1317                 replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_p->attid);
1318         }
1319
1320         /* create the meta data value */
1321         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
1322                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1323                                        &nmd,
1324                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1325         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1326                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1327                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1328         }
1329
1330         /*
1331          * check if some replicated attributes left, otherwise skip the ldb_modify() call
1332          */
1333         if (msg->num_elements == 0) {
1334                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
1335                           ar->index_current);
1336
1337                 ar->index_current++;
1338                 return replmd_replicated_apply_next(ar);
1339         }
1340
1341         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
1342                   ar->index_current, msg->num_elements);
1343
1344         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1345         if (ret != LDB_SUCCESS) {
1346                 return replmd_replicated_request_error(ar, ret);
1347         }
1348
1349         for (i=0; i<ni; i++) {
1350                 nmd.ctr.ctr1.array[i].local_usn = seq_num;
1351         }
1352
1353         /*
1354          * when we know that we'll modify the record, add the whenChanged, uSNChanged
1355          * and replPopertyMetaData attributes
1356          */
1357         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1358         if (ret != LDB_SUCCESS) {
1359                 return replmd_replicated_request_error(ar, ret);
1360         }
1361         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1362         if (ret != LDB_SUCCESS) {
1363                 return replmd_replicated_request_error(ar, ret);
1364         }
1365         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
1366         if (ret != LDB_SUCCESS) {
1367                 return replmd_replicated_request_error(ar, ret);
1368         }
1369
1370         replmd_ldb_message_sort(msg, ar->schema);
1371
1372         /* we want to replace the old values */
1373         for (i=0; i < msg->num_elements; i++) {
1374                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
1375         }
1376
1377         ret = replmd_notify(ar->module, msg->dn, seq_num);
1378         if (ret != LDB_SUCCESS) {
1379                 return replmd_replicated_request_error(ar, ret);
1380         }
1381
1382         if (DEBUGLVL(4)) {
1383                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1384                 DEBUG(4, ("DRS replication modify message:\n%s\n", s));
1385                 talloc_free(s);
1386         }
1387
1388         ret = ldb_build_mod_req(&change_req,
1389                                 ldb,
1390                                 ar,
1391                                 msg,
1392                                 ar->controls,
1393                                 ar,
1394                                 replmd_replicated_apply_merge_callback,
1395                                 ar->req);
1396         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1397
1398         return ldb_next_request(ar->module, change_req);
1399 }
1400
1401 static int replmd_replicated_apply_search_callback(struct ldb_request *req,
1402                                                    struct ldb_reply *ares)
1403 {
1404         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1405                                                struct replmd_replicated_request);
1406         int ret;
1407
1408         if (!ares) {
1409                 return ldb_module_done(ar->req, NULL, NULL,
1410                                         LDB_ERR_OPERATIONS_ERROR);
1411         }
1412         if (ares->error != LDB_SUCCESS &&
1413             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1414                 return ldb_module_done(ar->req, ares->controls,
1415                                         ares->response, ares->error);
1416         }
1417
1418         switch (ares->type) {
1419         case LDB_REPLY_ENTRY:
1420                 ar->search_msg = talloc_steal(ar, ares->message);
1421                 break;
1422
1423         case LDB_REPLY_REFERRAL:
1424                 /* we ignore referrals */
1425                 break;
1426
1427         case LDB_REPLY_DONE:
1428                 if (ar->search_msg != NULL) {
1429                         ret = replmd_replicated_apply_merge(ar);
1430                 } else {
1431                         ret = replmd_replicated_apply_add(ar);
1432                 }
1433                 if (ret != LDB_SUCCESS) {
1434                         return ldb_module_done(ar->req, NULL, NULL, ret);
1435                 }
1436         }
1437
1438         talloc_free(ares);
1439         return LDB_SUCCESS;
1440 }
1441
1442 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
1443
1444 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
1445 {
1446         struct ldb_context *ldb;
1447         int ret;
1448         char *tmp_str;
1449         char *filter;
1450         struct ldb_request *search_req;
1451
1452         if (ar->index_current >= ar->objs->num_objects) {
1453                 /* done with it, go to next stage */
1454                 return replmd_replicated_uptodate_vector(ar);
1455         }
1456
1457         ldb = ldb_module_get_ctx(ar->module);
1458         ar->search_msg = NULL;
1459
1460         tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].guid_value);
1461         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1462
1463         filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
1464         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1465         talloc_free(tmp_str);
1466
1467         ret = ldb_build_search_req(&search_req,
1468                                    ldb,
1469                                    ar,
1470                                    ar->objs->partition_dn,
1471                                    LDB_SCOPE_SUBTREE,
1472                                    filter,
1473                                    NULL,
1474                                    NULL,
1475                                    ar,
1476                                    replmd_replicated_apply_search_callback,
1477                                    ar->req);
1478         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1479
1480         return ldb_next_request(ar->module, search_req);
1481 }
1482
1483 static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
1484                                                       struct ldb_reply *ares)
1485 {
1486         struct ldb_context *ldb;
1487         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1488                                                struct replmd_replicated_request);
1489         ldb = ldb_module_get_ctx(ar->module);
1490
1491         if (!ares) {
1492                 return ldb_module_done(ar->req, NULL, NULL,
1493                                         LDB_ERR_OPERATIONS_ERROR);
1494         }
1495         if (ares->error != LDB_SUCCESS) {
1496                 return ldb_module_done(ar->req, ares->controls,
1497                                         ares->response, ares->error);
1498         }
1499
1500         if (ares->type != LDB_REPLY_DONE) {
1501                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1502                 return ldb_module_done(ar->req, NULL, NULL,
1503                                         LDB_ERR_OPERATIONS_ERROR);
1504         }
1505
1506         talloc_free(ares);
1507
1508         return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
1509 }
1510
1511 static int replmd_drsuapi_DsReplicaCursor2_compare(const struct drsuapi_DsReplicaCursor2 *c1,
1512                                                    const struct drsuapi_DsReplicaCursor2 *c2)
1513 {
1514         return GUID_compare(&c1->source_dsa_invocation_id, &c2->source_dsa_invocation_id);
1515 }
1516
1517 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1518 {
1519         struct ldb_context *ldb;
1520         struct ldb_request *change_req;
1521         enum ndr_err_code ndr_err;
1522         struct ldb_message *msg;
1523         struct replUpToDateVectorBlob ouv;
1524         const struct ldb_val *ouv_value;
1525         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1526         struct replUpToDateVectorBlob nuv;
1527         struct ldb_val nuv_value;
1528         struct ldb_message_element *nuv_el = NULL;
1529         const struct GUID *our_invocation_id;
1530         struct ldb_message_element *orf_el = NULL;
1531         struct repsFromToBlob nrf;
1532         struct ldb_val *nrf_value = NULL;
1533         struct ldb_message_element *nrf_el = NULL;
1534         uint32_t i,j,ni=0;
1535         bool found = false;
1536         time_t t = time(NULL);
1537         NTTIME now;
1538         int ret;
1539
1540         ldb = ldb_module_get_ctx(ar->module);
1541         ruv = ar->objs->uptodateness_vector;
1542         ZERO_STRUCT(ouv);
1543         ouv.version = 2;
1544         ZERO_STRUCT(nuv);
1545         nuv.version = 2;
1546
1547         unix_to_nt_time(&now, t);
1548
1549         /*
1550          * first create the new replUpToDateVector
1551          */
1552         ouv_value = ldb_msg_find_ldb_val(ar->search_msg, "replUpToDateVector");
1553         if (ouv_value) {
1554                 ndr_err = ndr_pull_struct_blob(ouv_value, ar,
1555                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &ouv,
1556                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1557                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1558                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1559                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1560                 }
1561
1562                 if (ouv.version != 2) {
1563                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1564                 }
1565         }
1566
1567         /*
1568          * the new uptodateness vector will at least
1569          * contain 1 entry, one for the source_dsa
1570          *
1571          * plus optional values from our old vector and the one from the source_dsa
1572          */
1573         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
1574         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1575         nuv.ctr.ctr2.cursors = talloc_array(ar,
1576                                             struct drsuapi_DsReplicaCursor2,
1577                                             nuv.ctr.ctr2.count);
1578         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1579
1580         /* first copy the old vector */
1581         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1582                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1583                 ni++;
1584         }
1585
1586         /* get our invocation_id if we have one already attached to the ldb */
1587         our_invocation_id = samdb_ntds_invocation_id(ldb);
1588
1589         /* merge in the source_dsa vector is available */
1590         for (i=0; (ruv && i < ruv->count); i++) {
1591                 found = false;
1592
1593                 if (our_invocation_id &&
1594                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1595                                our_invocation_id)) {
1596                         continue;
1597                 }
1598
1599                 for (j=0; j < ni; j++) {
1600                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1601                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1602                                 continue;
1603                         }
1604
1605                         found = true;
1606
1607                         /*
1608                          * we update only the highest_usn and not the latest_sync_success time,
1609                          * because the last success stands for direct replication
1610                          */
1611                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1612                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1613                         }
1614                         break;                  
1615                 }
1616
1617                 if (found) continue;
1618
1619                 /* if it's not there yet, add it */
1620                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1621                 ni++;
1622         }
1623
1624         /*
1625          * merge in the current highwatermark for the source_dsa
1626          */
1627         found = false;
1628         for (j=0; j < ni; j++) {
1629                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1630                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1631                         continue;
1632                 }
1633
1634                 found = true;
1635
1636                 /*
1637                  * here we update the highest_usn and last_sync_success time
1638                  * because we're directly replicating from the source_dsa
1639                  *
1640                  * and use the tmp_highest_usn because this is what we have just applied
1641                  * to our ldb
1642                  */
1643                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1644                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1645                 break;
1646         }
1647         if (!found) {
1648                 /*
1649                  * here we update the highest_usn and last_sync_success time
1650                  * because we're directly replicating from the source_dsa
1651                  *
1652                  * and use the tmp_highest_usn because this is what we have just applied
1653                  * to our ldb
1654                  */
1655                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1656                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1657                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1658                 ni++;
1659         }
1660
1661         /*
1662          * finally correct the size of the cursors array
1663          */
1664         nuv.ctr.ctr2.count = ni;
1665
1666         /*
1667          * sort the cursors
1668          */
1669         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1670               sizeof(struct drsuapi_DsReplicaCursor2),
1671               (comparison_fn_t)replmd_drsuapi_DsReplicaCursor2_compare);
1672
1673         /*
1674          * create the change ldb_message
1675          */
1676         msg = ldb_msg_new(ar);
1677         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1678         msg->dn = ar->search_msg->dn;
1679
1680         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
1681                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1682                                        &nuv,
1683                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1684         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1685                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1686                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1687         }
1688         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1689         if (ret != LDB_SUCCESS) {
1690                 return replmd_replicated_request_error(ar, ret);
1691         }
1692         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1693
1694         /*
1695          * now create the new repsFrom value from the given repsFromTo1 structure
1696          */
1697         ZERO_STRUCT(nrf);
1698         nrf.version                                     = 1;
1699         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1700         /* and fix some values... */
1701         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1702         nrf.ctr.ctr1.last_success                       = now;
1703         nrf.ctr.ctr1.last_attempt                       = now;
1704         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1705         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1706
1707         /*
1708          * first see if we already have a repsFrom value for the current source dsa
1709          * if so we'll later replace this value
1710          */
1711         orf_el = ldb_msg_find_element(ar->search_msg, "repsFrom");
1712         if (orf_el) {
1713                 for (i=0; i < orf_el->num_values; i++) {
1714                         struct repsFromToBlob *trf;
1715
1716                         trf = talloc(ar, struct repsFromToBlob);
1717                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1718
1719                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), trf,
1720                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1721                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1722                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1723                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1724                         }
1725
1726                         if (trf->version != 1) {
1727                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1728                         }
1729
1730                         /*
1731                          * we compare the source dsa objectGUID not the invocation_id
1732                          * because we want only one repsFrom value per source dsa
1733                          * and when the invocation_id of the source dsa has changed we don't need 
1734                          * the old repsFrom with the old invocation_id
1735                          */
1736                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1737                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1738                                 talloc_free(trf);
1739                                 continue;
1740                         }
1741
1742                         talloc_free(trf);
1743                         nrf_value = &orf_el->values[i];
1744                         break;
1745                 }
1746
1747                 /*
1748                  * copy over all old values to the new ldb_message
1749                  */
1750                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1751                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1752                 *nrf_el = *orf_el;
1753         }
1754
1755         /*
1756          * if we haven't found an old repsFrom value for the current source dsa
1757          * we'll add a new value
1758          */
1759         if (!nrf_value) {
1760                 struct ldb_val zero_value;
1761                 ZERO_STRUCT(zero_value);
1762                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1763                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1764
1765                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1766         }
1767
1768         /* we now fill the value which is already attached to ldb_message */
1769         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
1770                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1771                                        &nrf,
1772                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1773         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1774                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1775                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1776         }
1777
1778         /* 
1779          * the ldb_message_element for the attribute, has all the old values and the new one
1780          * so we'll replace the whole attribute with all values
1781          */
1782         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1783
1784         if (DEBUGLVL(4)) {
1785                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1786                 DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
1787                 talloc_free(s);
1788         }
1789
1790         /* prepare the ldb_modify() request */
1791         ret = ldb_build_mod_req(&change_req,
1792                                 ldb,
1793                                 ar,
1794                                 msg,
1795                                 ar->controls,
1796                                 ar,
1797                                 replmd_replicated_uptodate_modify_callback,
1798                                 ar->req);
1799         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1800
1801         return ldb_next_request(ar->module, change_req);
1802 }
1803
1804 static int replmd_replicated_uptodate_search_callback(struct ldb_request *req,
1805                                                       struct ldb_reply *ares)
1806 {
1807         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1808                                                struct replmd_replicated_request);
1809         int ret;
1810
1811         if (!ares) {
1812                 return ldb_module_done(ar->req, NULL, NULL,
1813                                         LDB_ERR_OPERATIONS_ERROR);
1814         }
1815         if (ares->error != LDB_SUCCESS &&
1816             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1817                 return ldb_module_done(ar->req, ares->controls,
1818                                         ares->response, ares->error);
1819         }
1820
1821         switch (ares->type) {
1822         case LDB_REPLY_ENTRY:
1823                 ar->search_msg = talloc_steal(ar, ares->message);
1824                 break;
1825
1826         case LDB_REPLY_REFERRAL:
1827                 /* we ignore referrals */
1828                 break;
1829
1830         case LDB_REPLY_DONE:
1831                 if (ar->search_msg == NULL) {
1832                         ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1833                 } else {
1834                         ret = replmd_replicated_uptodate_modify(ar);
1835                 }
1836                 if (ret != LDB_SUCCESS) {
1837                         return ldb_module_done(ar->req, NULL, NULL, ret);
1838                 }
1839         }
1840
1841         talloc_free(ares);
1842         return LDB_SUCCESS;
1843 }
1844
1845
1846 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1847 {
1848         struct ldb_context *ldb;
1849         int ret;
1850         static const char *attrs[] = {
1851                 "replUpToDateVector",
1852                 "repsFrom",
1853                 NULL
1854         };
1855         struct ldb_request *search_req;
1856
1857         ldb = ldb_module_get_ctx(ar->module);
1858         ar->search_msg = NULL;
1859
1860         ret = ldb_build_search_req(&search_req,
1861                                    ldb,
1862                                    ar,
1863                                    ar->objs->partition_dn,
1864                                    LDB_SCOPE_BASE,
1865                                    "(objectClass=*)",
1866                                    attrs,
1867                                    NULL,
1868                                    ar,
1869                                    replmd_replicated_uptodate_search_callback,
1870                                    ar->req);
1871         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1872
1873         return ldb_next_request(ar->module, search_req);
1874 }
1875
1876
1877
1878 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1879 {
1880         struct ldb_context *ldb;
1881         struct dsdb_extended_replicated_objects *objs;
1882         struct replmd_replicated_request *ar;
1883         struct ldb_control **ctrls;
1884         int ret, i;
1885         struct dsdb_control_current_partition *partition_ctrl;
1886         struct replmd_private *replmd_private = 
1887                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1888
1889         ldb = ldb_module_get_ctx(module);
1890
1891         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1892
1893         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1894         if (!objs) {
1895                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1896                 return LDB_ERR_PROTOCOL_ERROR;
1897         }
1898
1899         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1900                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1901                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1902                 return LDB_ERR_PROTOCOL_ERROR;
1903         }
1904
1905         ar = replmd_ctx_init(module, req);
1906         if (!ar)
1907                 return LDB_ERR_OPERATIONS_ERROR;
1908
1909         ar->objs = objs;
1910         ar->schema = dsdb_get_schema(ldb);
1911         if (!ar->schema) {
1912                 ldb_debug_set(ldb, LDB_DEBUG_FATAL, "replmd_ctx_init: no loaded schema found\n");
1913                 talloc_free(ar);
1914                 return LDB_ERR_CONSTRAINT_VIOLATION;
1915         }
1916
1917         ctrls = req->controls;
1918
1919         if (req->controls) {
1920                 req->controls = talloc_memdup(ar, req->controls,
1921                                               talloc_get_size(req->controls));
1922                 if (!req->controls) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1923         }
1924
1925         ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, NULL);
1926         if (ret != LDB_SUCCESS) {
1927                 return ret;
1928         }
1929
1930         /*
1931           add the DSDB_CONTROL_CURRENT_PARTITION_OID control. This
1932           tells the partition module which partition this request is
1933           directed at. That is important as the partition roots appear
1934           twice in the directory, once as mount points in the top
1935           level store, and once as the roots of each partition. The
1936           replication code wants to operate on the root of the
1937           partitions, not the top level mount points
1938          */
1939         partition_ctrl = talloc(req, struct dsdb_control_current_partition);
1940         if (partition_ctrl == NULL) {
1941                 if (!partition_ctrl) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1942         }
1943         partition_ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
1944         partition_ctrl->dn = objs->partition_dn;
1945
1946         ret = ldb_request_add_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID, false, partition_ctrl);
1947         if (ret != LDB_SUCCESS) {
1948                 return ret;
1949         }
1950
1951         ar->controls = req->controls;
1952         req->controls = ctrls;
1953
1954         DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
1955
1956         /* save away the linked attributes for the end of the
1957            transaction */
1958         for (i=0; i<ar->objs->linked_attributes_count; i++) {
1959                 struct la_entry *la_entry;
1960
1961                 if (replmd_private->la_list) {
1962                         la_entry = talloc(replmd_private->la_list,
1963                                           struct la_entry);
1964                 } else {
1965                         la_entry = talloc(replmd_private,
1966                                           struct la_entry);
1967                 }
1968                 if (la_entry == NULL) {
1969                         ldb_oom(ldb);
1970                         return LDB_ERR_OPERATIONS_ERROR;
1971                 }
1972                 la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
1973                 if (la_entry->la == NULL) {
1974                         talloc_free(la_entry);
1975                         ldb_oom(ldb);
1976                         return LDB_ERR_OPERATIONS_ERROR;
1977                 }
1978                 *la_entry->la = ar->objs->linked_attributes[i];
1979
1980                 /* we need to steal the non-scalars so they stay
1981                    around until the end of the transaction */
1982                 talloc_steal(la_entry->la, la_entry->la->identifier);
1983                 talloc_steal(la_entry->la, la_entry->la->value.blob);
1984
1985                 DLIST_ADD(replmd_private->la_list, la_entry);
1986         }
1987
1988         return replmd_replicated_apply_next(ar);
1989 }
1990
1991 /*
1992   process one linked attribute structure
1993  */
1994 static int replmd_process_linked_attribute(struct ldb_module *module,
1995                                            struct la_entry *la_entry)
1996 {                                          
1997         struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
1998         struct ldb_context *ldb = ldb_module_get_ctx(module);
1999         struct drsuapi_DsReplicaObjectIdentifier3 target;
2000         struct ldb_message *msg;
2001         struct ldb_message_element *ret_el;
2002         TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
2003         enum ndr_err_code ndr_err;
2004         char *target_dn;
2005         struct ldb_request *mod_req;
2006         int ret;
2007         const struct dsdb_attribute *attr;
2008
2009 /*
2010 linked_attributes[0]:                                                     
2011      &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute 
2012         identifier               : *                                      
2013             identifier: struct drsuapi_DsReplicaObjectIdentifier          
2014                 __ndr_size               : 0x0000003a (58)                
2015                 __ndr_size_sid           : 0x00000000 (0)                 
2016                 guid                     : 8e95b6a9-13dd-4158-89db-3220a5be5cc7
2017                 sid                      : S-0-0                               
2018                 __ndr_size_dn            : 0x00000000 (0)                      
2019                 dn                       : ''                                  
2020         attid                    : DRSUAPI_ATTRIBUTE_member (0x1F)             
2021         value: struct drsuapi_DsAttributeValue                                 
2022             __ndr_size               : 0x0000007e (126)                        
2023             blob                     : *                                       
2024                 blob                     : DATA_BLOB length=126                
2025         flags                    : 0x00000001 (1)                              
2026                1: DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE                      
2027         originating_add_time     : Wed Sep  2 22:20:01 2009 EST                
2028         meta_data: struct drsuapi_DsReplicaMetaData                            
2029             version                  : 0x00000015 (21)                         
2030             originating_change_time  : Wed Sep  2 23:39:07 2009 EST            
2031             originating_invocation_id: 794640f3-18cf-40ee-a211-a93992b67a64    
2032             originating_usn          : 0x000000000001e19c (123292)             
2033      &target: struct drsuapi_DsReplicaObjectIdentifier3                        
2034         __ndr_size               : 0x0000007e (126)                            
2035         __ndr_size_sid           : 0x0000001c (28)                             
2036         guid                     : 7639e594-db75-4086-b0d4-67890ae46031        
2037         sid                      : S-1-5-21-2848215498-2472035911-1947525656-19924
2038         __ndr_size_dn            : 0x00000022 (34)                                
2039         dn                       : 'CN=UOne,OU=TestOU,DC=vsofs8,DC=com'           
2040  */
2041         if (DEBUGLVL(4)) {
2042                 NDR_PRINT_DEBUG(drsuapi_DsReplicaLinkedAttribute, la); 
2043         }
2044         
2045         /* decode the target of the link */
2046         ndr_err = ndr_pull_struct_blob(la->value.blob, 
2047                                        tmp_ctx, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
2048                                        &target,
2049                                        (ndr_pull_flags_fn_t)ndr_pull_drsuapi_DsReplicaObjectIdentifier3);
2050         if (ndr_err != NDR_ERR_SUCCESS) {
2051                 DEBUG(0,("Unable to decode linked_attribute target\n"));
2052                 dump_data(4, la->value.blob->data, la->value.blob->length);                     
2053                 talloc_free(tmp_ctx);
2054                 return LDB_ERR_OPERATIONS_ERROR;
2055         }
2056         if (DEBUGLVL(4)) {
2057                 NDR_PRINT_DEBUG(drsuapi_DsReplicaObjectIdentifier3, &target);
2058         }
2059
2060         /* construct a modify request for this attribute change */
2061         msg = ldb_msg_new(tmp_ctx);
2062         if (!msg) {
2063                 ldb_oom(ldb);
2064                 talloc_free(tmp_ctx);
2065                 return LDB_ERR_OPERATIONS_ERROR;
2066         }
2067
2068         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, 
2069                                    GUID_string(tmp_ctx, &la->identifier->guid), &msg->dn);
2070         if (ret != LDB_SUCCESS) {
2071                 talloc_free(tmp_ctx);
2072                 return ret;
2073         }
2074
2075         /* find the attribute being modified */
2076         attr = dsdb_attribute_by_attributeID_id(dsdb_get_schema(ldb), la->attid);
2077         if (attr == NULL) {
2078                 DEBUG(0, (__location__ ": Unable to find attributeID 0x%x\n", la->attid));
2079                 talloc_free(tmp_ctx);
2080                 return LDB_ERR_OPERATIONS_ERROR;
2081         }
2082
2083         if (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) {
2084                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
2085                                         LDB_FLAG_MOD_ADD, &ret_el);
2086         } else {
2087                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
2088                                         LDB_FLAG_MOD_DELETE, &ret_el);
2089         }
2090         if (ret != LDB_SUCCESS) {
2091                 talloc_free(tmp_ctx);
2092                 return ret;
2093         }
2094         ret_el->values = talloc_array(msg, struct ldb_val, 1);
2095         if (!ret_el->values) {
2096                 ldb_oom(ldb);
2097                 talloc_free(tmp_ctx);
2098                 return LDB_ERR_OPERATIONS_ERROR;
2099         }
2100         ret_el->num_values = 1;
2101
2102         target_dn = talloc_asprintf(tmp_ctx, "<GUID=%s>;<SID=%s>;%s",
2103                                     GUID_string(tmp_ctx, &target.guid),
2104                                     dom_sid_string(tmp_ctx, &target.sid),
2105                                     target.dn);
2106         if (target_dn == NULL) {
2107                 ldb_oom(ldb);
2108                 talloc_free(tmp_ctx);
2109                 return LDB_ERR_OPERATIONS_ERROR;
2110         }
2111         ret_el->values[0] = data_blob_string_const(target_dn);
2112
2113         ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
2114                                 msg,
2115                                 NULL,
2116                                 NULL, 
2117                                 ldb_op_default_callback,
2118                                 NULL);
2119         if (ret != LDB_SUCCESS) {
2120                 talloc_free(tmp_ctx);
2121                 return ret;
2122         }
2123         talloc_steal(mod_req, msg);
2124
2125         if (DEBUGLVL(4)) {
2126                 DEBUG(4,("Applying DRS linked attribute change:\n%s\n",
2127                          ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg)));
2128         }
2129
2130         /* Run the new request */
2131         ret = ldb_next_request(module, mod_req);
2132
2133         /* we need to wait for this to finish, as we are being called
2134            from the synchronous end_transaction hook of this module */
2135         if (ret == LDB_SUCCESS) {
2136                 ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
2137         }
2138
2139         if (ret != LDB_SUCCESS) {
2140                 ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s' %s\n",
2141                           ldb_errstring(ldb),
2142                           ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
2143         }
2144         
2145         talloc_free(tmp_ctx);
2146
2147         return ret;     
2148 }
2149
2150 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
2151 {
2152         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
2153                 return replmd_extended_replicated_objects(module, req);
2154         }
2155
2156         return ldb_next_request(module, req);
2157 }
2158
2159
2160 /*
2161   we hook into the transaction operations to allow us to 
2162   perform the linked attribute updates at the end of the whole
2163   transaction. This allows a forward linked attribute to be created
2164   before the object is created. During a vampire, w2k8 sends us linked
2165   attributes before the objects they are part of.
2166  */
2167 static int replmd_start_transaction(struct ldb_module *module)
2168 {
2169         /* create our private structure for this transaction */
2170         int i;
2171         struct replmd_private *replmd_private = talloc_get_type(ldb_module_get_private(module),
2172                                                                 struct replmd_private);
2173         talloc_free(replmd_private->la_list);
2174         replmd_private->la_list = NULL;
2175
2176         for (i=0; i<replmd_private->num_ncs; i++) {
2177                 replmd_private->ncs[i].mod_usn = 0;
2178         }
2179
2180         return ldb_next_start_trans(module);
2181 }
2182
2183 /*
2184   on prepare commit we loop over our queued la_context structures and
2185   apply each of them  
2186  */
2187 static int replmd_prepare_commit(struct ldb_module *module)
2188 {
2189         struct replmd_private *replmd_private = 
2190                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2191         struct la_entry *la, *prev;
2192         int ret;
2193
2194         /* walk the list backwards, to do the first entry first, as we
2195          * added the entries with DLIST_ADD() which puts them at the
2196          * start of the list */
2197         for (la = replmd_private->la_list; la && la->next; la=la->next) ;
2198
2199         for (; la; la=prev) {
2200                 prev = la->prev;
2201                 DLIST_REMOVE(replmd_private->la_list, la);
2202                 ret = replmd_process_linked_attribute(module, la);
2203                 talloc_free(la);
2204                 if (ret != LDB_SUCCESS) {
2205                         return ret;
2206                 }
2207         }
2208
2209         talloc_free(replmd_private->la_list);
2210         replmd_private->la_list = NULL;
2211
2212         /* possibly change @REPLCHANGED */
2213         ret = replmd_notify_store(module);
2214         if (ret != LDB_SUCCESS) {
2215                 return ret;
2216         }
2217         
2218         return ldb_next_prepare_commit(module);
2219 }
2220
2221 static int replmd_del_transaction(struct ldb_module *module)
2222 {
2223         struct replmd_private *replmd_private = 
2224                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
2225         talloc_free(replmd_private->la_list);
2226         replmd_private->la_list = NULL;
2227         return ldb_next_del_trans(module);
2228 }
2229
2230
2231 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
2232         .name          = "repl_meta_data",
2233         .init_context      = replmd_init,
2234         .add               = replmd_add,
2235         .modify            = replmd_modify,
2236         .extended          = replmd_extended,
2237         .start_transaction = replmd_start_transaction,
2238         .prepare_commit    = replmd_prepare_commit,
2239         .del_transaction   = replmd_del_transaction,
2240 };