1242d1d0a361412e651fa9e452931b251f9d95d6
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /*
24  *  Name: ldb
25  *
26  *  Component: ldb repl_meta_data module
27  *
28  *  Description: - add a unique objectGUID onto every new record,
29  *               - handle whenCreated, whenChanged timestamps
30  *               - handle uSNCreated, uSNChanged numbers
31  *               - handle replPropertyMetaData attribute
32  *
33  *  Author: Simo Sorce
34  *  Author: Stefan Metzmacher
35  */
36
37 #include "includes.h"
38 #include "ldb_module.h"
39 #include "dsdb/samdb/samdb.h"
40 #include "dsdb/common/proto.h"
41 #include "../libds/common/flags.h"
42 #include "librpc/gen_ndr/ndr_misc.h"
43 #include "librpc/gen_ndr/ndr_drsuapi.h"
44 #include "librpc/gen_ndr/ndr_drsblobs.h"
45 #include "param/param.h"
46 #include "libcli/security/dom_sid.h"
47 #include "lib/util/dlinklist.h"
48
49 struct replmd_private {
50         struct la_entry *la_list;
51 };
52
53 struct la_entry {
54         struct la_entry *next, *prev;
55         struct drsuapi_DsReplicaLinkedAttribute *la;
56 };
57
58 struct replmd_replicated_request {
59         struct ldb_module *module;
60         struct ldb_request *req;
61
62         const struct dsdb_schema *schema;
63
64         struct dsdb_extended_replicated_objects *objs;
65
66         /* the controls we pass down */
67         struct ldb_control **controls;
68
69         uint32_t index_current;
70
71         struct ldb_message *search_msg;
72 };
73
74 /*
75   created a replmd_replicated_request context
76  */
77 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
78                                                          struct ldb_request *req)
79 {
80         struct ldb_context *ldb;
81         struct replmd_replicated_request *ac;
82
83         ldb = ldb_module_get_ctx(module);
84
85         ac = talloc_zero(req, struct replmd_replicated_request);
86         if (ac == NULL) {
87                 ldb_oom(ldb);
88                 return NULL;
89         }
90
91         ac->module = module;
92         ac->req = req;
93         return ac;
94 }
95
96 /*
97   add a time element to a record
98 */
99 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
100 {
101         struct ldb_message_element *el;
102         char *s;
103
104         if (ldb_msg_find_element(msg, attr) != NULL) {
105                 return LDB_SUCCESS;
106         }
107
108         s = ldb_timestring(msg, t);
109         if (s == NULL) {
110                 return LDB_ERR_OPERATIONS_ERROR;
111         }
112
113         if (ldb_msg_add_string(msg, attr, s) != LDB_SUCCESS) {
114                 return LDB_ERR_OPERATIONS_ERROR;
115         }
116
117         el = ldb_msg_find_element(msg, attr);
118         /* always set as replace. This works because on add ops, the flag
119            is ignored */
120         el->flags = LDB_FLAG_MOD_REPLACE;
121
122         return LDB_SUCCESS;
123 }
124
125 /*
126   add a uint64_t element to a record
127 */
128 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
129 {
130         struct ldb_message_element *el;
131
132         if (ldb_msg_find_element(msg, attr) != NULL) {
133                 return LDB_SUCCESS;
134         }
135
136         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != LDB_SUCCESS) {
137                 return LDB_ERR_OPERATIONS_ERROR;
138         }
139
140         el = ldb_msg_find_element(msg, attr);
141         /* always set as replace. This works because on add ops, the flag
142            is ignored */
143         el->flags = LDB_FLAG_MOD_REPLACE;
144
145         return LDB_SUCCESS;
146 }
147
148 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
149                                                    const struct replPropertyMetaData1 *m2,
150                                                    const uint32_t *rdn_attid)
151 {
152         if (m1->attid == m2->attid) {
153                 return 0;
154         }
155
156         /*
157          * the rdn attribute should be at the end!
158          * so we need to return a value greater than zero
159          * which means m1 is greater than m2
160          */
161         if (m1->attid == *rdn_attid) {
162                 return 1;
163         }
164
165         /*
166          * the rdn attribute should be at the end!
167          * so we need to return a value less than zero
168          * which means m2 is greater than m1
169          */
170         if (m2->attid == *rdn_attid) {
171                 return -1;
172         }
173
174         return m1->attid - m2->attid;
175 }
176
177 static void replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
178                                                  const uint32_t *rdn_attid)
179 {
180         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
181                   discard_const_p(void, rdn_attid), (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
182 }
183
184 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
185                                                  const struct ldb_message_element *e2,
186                                                  const struct dsdb_schema *schema)
187 {
188         const struct dsdb_attribute *a1;
189         const struct dsdb_attribute *a2;
190
191         /* 
192          * TODO: make this faster by caching the dsdb_attribute pointer
193          *       on the ldb_messag_element
194          */
195
196         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
197         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
198
199         /*
200          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
201          *       in the schema
202          */
203         if (!a1 || !a2) {
204                 return strcasecmp(e1->name, e2->name);
205         }
206
207         return a1->attributeID_id - a2->attributeID_id;
208 }
209
210 static void replmd_ldb_message_sort(struct ldb_message *msg,
211                                     const struct dsdb_schema *schema)
212 {
213         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
214                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
215 }
216
217 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
218 {
219         struct ldb_context *ldb;
220         struct replmd_replicated_request *ac;
221
222         ac = talloc_get_type(req->context, struct replmd_replicated_request);
223         ldb = ldb_module_get_ctx(ac->module);
224
225         if (!ares) {
226                 return ldb_module_done(ac->req, NULL, NULL,
227                                         LDB_ERR_OPERATIONS_ERROR);
228         }
229         if (ares->error != LDB_SUCCESS) {
230                 return ldb_module_done(ac->req, ares->controls,
231                                         ares->response, ares->error);
232         }
233
234         if (ares->type != LDB_REPLY_DONE) {
235                 ldb_set_errstring(ldb,
236                                   "invalid ldb_reply_type in callback");
237                 talloc_free(ares);
238                 return ldb_module_done(ac->req, NULL, NULL,
239                                         LDB_ERR_OPERATIONS_ERROR);
240         }
241
242         return ldb_module_done(ac->req, ares->controls,
243                                 ares->response, LDB_SUCCESS);
244 }
245
246 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
247 {
248         struct ldb_context *ldb;
249         struct replmd_replicated_request *ac;
250         const struct dsdb_schema *schema;
251         enum ndr_err_code ndr_err;
252         struct ldb_request *down_req;
253         struct ldb_message *msg;
254         const struct dsdb_attribute *rdn_attr = NULL;
255         struct GUID guid;
256         struct ldb_val guid_value;
257         struct replPropertyMetaDataBlob nmd;
258         struct ldb_val nmd_value;
259         uint64_t seq_num;
260         const struct GUID *our_invocation_id;
261         time_t t = time(NULL);
262         NTTIME now;
263         char *time_str;
264         int ret;
265         uint32_t i, ni=0;
266
267         /* do not manipulate our control entries */
268         if (ldb_dn_is_special(req->op.add.message->dn)) {
269                 return ldb_next_request(module, req);
270         }
271
272         ldb = ldb_module_get_ctx(module);
273
274         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
275
276         schema = dsdb_get_schema(ldb);
277         if (!schema) {
278                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
279                               "replmd_add: no dsdb_schema loaded");
280                 return LDB_ERR_CONSTRAINT_VIOLATION;
281         }
282
283         ac = replmd_ctx_init(module, req);
284         if (!ac) {
285                 return LDB_ERR_OPERATIONS_ERROR;
286         }
287
288         ac->schema = schema;
289
290         if (ldb_msg_find_element(req->op.add.message, "objectGUID") != NULL) {
291                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
292                               "replmd_add: it's not allowed to add an object with objectGUID\n");
293                 return LDB_ERR_UNWILLING_TO_PERFORM;
294         }
295
296         /* Get a sequence number from the backend */
297         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
298         if (ret != LDB_SUCCESS) {
299                 return ret;
300         }
301
302         /* a new GUID */
303         guid = GUID_random();
304
305         /* get our invocationId */
306         our_invocation_id = samdb_ntds_invocation_id(ldb);
307         if (!our_invocation_id) {
308                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
309                               "replmd_add: unable to find invocationId\n");
310                 return LDB_ERR_OPERATIONS_ERROR;
311         }
312
313         /* we have to copy the message as the caller might have it as a const */
314         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
315         if (msg == NULL) {
316                 ldb_oom(ldb);
317                 return LDB_ERR_OPERATIONS_ERROR;
318         }
319
320         /* generated times */
321         unix_to_nt_time(&now, t);
322         time_str = ldb_timestring(msg, t);
323         if (!time_str) {
324                 return LDB_ERR_OPERATIONS_ERROR;
325         }
326
327         /* 
328          * remove autogenerated attributes
329          */
330         ldb_msg_remove_attr(msg, "whenCreated");
331         ldb_msg_remove_attr(msg, "whenChanged");
332         ldb_msg_remove_attr(msg, "uSNCreated");
333         ldb_msg_remove_attr(msg, "uSNChanged");
334         ldb_msg_remove_attr(msg, "replPropertyMetaData");
335
336         /*
337          * readd replicated attributes
338          */
339         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
340         if (ret != LDB_SUCCESS) {
341                 ldb_oom(ldb);
342                 return LDB_ERR_OPERATIONS_ERROR;
343         }
344
345         /* build the replication meta_data */
346         ZERO_STRUCT(nmd);
347         nmd.version             = 1;
348         nmd.ctr.ctr1.count      = msg->num_elements;
349         nmd.ctr.ctr1.array      = talloc_array(msg,
350                                                struct replPropertyMetaData1,
351                                                nmd.ctr.ctr1.count);
352         if (!nmd.ctr.ctr1.array) {
353                 ldb_oom(ldb);
354                 return LDB_ERR_OPERATIONS_ERROR;
355         }
356
357         for (i=0; i < msg->num_elements; i++) {
358                 struct ldb_message_element *e = &msg->elements[i];
359                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
360                 const struct dsdb_attribute *sa;
361
362                 if (e->name[0] == '@') continue;
363
364                 sa = dsdb_attribute_by_lDAPDisplayName(schema, e->name);
365                 if (!sa) {
366                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
367                                       "replmd_add: attribute '%s' not defined in schema\n",
368                                       e->name);
369                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
370                 }
371
372                 if ((sa->systemFlags & 0x00000001) || (sa->systemFlags & 0x00000004)) {
373                         /* if the attribute is not replicated (0x00000001)
374                          * or constructed (0x00000004) it has no metadata
375                          */
376                         continue;
377                 }
378
379                 m->attid                        = sa->attributeID_id;
380                 m->version                      = 1;
381                 m->originating_change_time      = now;
382                 m->originating_invocation_id    = *our_invocation_id;
383                 m->originating_usn              = seq_num;
384                 m->local_usn                    = seq_num;
385                 ni++;
386
387                 if (ldb_attr_cmp(e->name, ldb_dn_get_rdn_name(msg->dn))) {
388                         rdn_attr = sa;
389                 }
390         }
391
392         /* fix meta data count */
393         nmd.ctr.ctr1.count = ni;
394
395         /*
396          * sort meta data array, and move the rdn attribute entry to the end
397          */
398         replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_attr->attributeID_id);
399
400         /* generated NDR encoded values */
401         ndr_err = ndr_push_struct_blob(&guid_value, msg, 
402                                        NULL,
403                                        &guid,
404                                        (ndr_push_flags_fn_t)ndr_push_GUID);
405         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
406                 ldb_oom(ldb);
407                 return LDB_ERR_OPERATIONS_ERROR;
408         }
409         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
410                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
411                                        &nmd,
412                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
413         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
414                 ldb_oom(ldb);
415                 return LDB_ERR_OPERATIONS_ERROR;
416         }
417
418         /*
419          * add the autogenerated values
420          */
421         ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
422         if (ret != LDB_SUCCESS) {
423                 ldb_oom(ldb);
424                 return LDB_ERR_OPERATIONS_ERROR;
425         }
426         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
427         if (ret != LDB_SUCCESS) {
428                 ldb_oom(ldb);
429                 return LDB_ERR_OPERATIONS_ERROR;
430         }
431         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
432         if (ret != LDB_SUCCESS) {
433                 ldb_oom(ldb);
434                 return LDB_ERR_OPERATIONS_ERROR;
435         }
436         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
437         if (ret != LDB_SUCCESS) {
438                 ldb_oom(ldb);
439                 return LDB_ERR_OPERATIONS_ERROR;
440         }
441         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
442         if (ret != LDB_SUCCESS) {
443                 ldb_oom(ldb);
444                 return LDB_ERR_OPERATIONS_ERROR;
445         }
446
447         /*
448          * sort the attributes by attid before storing the object
449          */
450         replmd_ldb_message_sort(msg, schema);
451
452         ret = ldb_build_add_req(&down_req, ldb, ac,
453                                 msg,
454                                 req->controls,
455                                 ac, replmd_op_callback,
456                                 req);
457         if (ret != LDB_SUCCESS) {
458                 return ret;
459         }
460
461         /* go on with the call chain */
462         return ldb_next_request(module, down_req);
463 }
464
465
466 /*
467  * update the replPropertyMetaData for one element
468  */
469 static int replmd_update_rpmd_element(struct ldb_context *ldb, 
470                                       struct ldb_message *msg,
471                                       struct ldb_message_element *el,
472                                       struct replPropertyMetaDataBlob *omd,
473                                       struct dsdb_schema *schema,
474                                       uint64_t *seq_num,
475                                       const struct GUID *our_invocation_id,
476                                       NTTIME now)
477 {
478         int i;
479         const struct dsdb_attribute *a;
480         struct replPropertyMetaData1 *md1;
481
482         a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
483         if (a == NULL) {
484                 DEBUG(0,(__location__ ": Unable to find attribute %s in schema\n",
485                          el->name));
486                 return LDB_ERR_OPERATIONS_ERROR;
487         }
488
489         if ((a->systemFlags & 0x00000001) || (a->systemFlags & 0x00000004)) {
490                 /* if the attribute is not replicated (0x00000001)
491                  * or constructed (0x00000004) it has no metadata
492                  */
493                 return LDB_SUCCESS;
494         }
495
496         for (i=0; i<omd->ctr.ctr1.count; i++) {
497                 if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) break;
498         }
499         if (i == omd->ctr.ctr1.count) {
500                 /* we need to add a new one */
501                 omd->ctr.ctr1.array = talloc_realloc(msg, omd->ctr.ctr1.array, 
502                                                      struct replPropertyMetaData1, omd->ctr.ctr1.count+1);
503                 if (omd->ctr.ctr1.array == NULL) {
504                         ldb_oom(ldb);
505                         return LDB_ERR_OPERATIONS_ERROR;
506                 }
507                 omd->ctr.ctr1.count++;
508                 ZERO_STRUCT(omd->ctr.ctr1.array[i]);
509         }
510
511         /* Get a new sequence number from the backend. We only do this
512          * if we have a change that requires a new
513          * replPropertyMetaData element 
514          */
515         if (*seq_num == 0) {
516                 int ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num);
517                 if (ret != LDB_SUCCESS) {
518                         return LDB_ERR_OPERATIONS_ERROR;
519                 }
520         }
521
522         md1 = &omd->ctr.ctr1.array[i];
523         md1->version++;
524         md1->attid                     = a->attributeID_id;
525         md1->originating_change_time   = now;
526         md1->originating_invocation_id = *our_invocation_id;
527         md1->originating_usn           = *seq_num;
528         md1->local_usn                 = *seq_num;
529         
530         return LDB_SUCCESS;
531 }
532
533 /*
534  * update the replPropertyMetaData object each time we modify an
535  * object. This is needed for DRS replication, as the merge on the
536  * client is based on this object 
537  */
538 static int replmd_update_rpmd(struct ldb_context *ldb, struct ldb_message *msg,
539                               uint64_t *seq_num)
540 {
541         const struct ldb_val *omd_value;
542         enum ndr_err_code ndr_err;
543         struct replPropertyMetaDataBlob omd;
544         int i;
545         struct dsdb_schema *schema;
546         time_t t = time(NULL);
547         NTTIME now;
548         const struct GUID *our_invocation_id;
549         int ret;
550         const char *attrs[] = { "replPropertyMetaData" , NULL };
551         struct ldb_result *res;
552
553         our_invocation_id = samdb_ntds_invocation_id(ldb);
554         if (!our_invocation_id) {
555                 /* this happens during an initial vampire while
556                    updating the schema */
557                 DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
558                 return LDB_SUCCESS;
559         }
560
561         unix_to_nt_time(&now, t);
562
563         /* search for the existing replPropertyMetaDataBlob */
564         ret = ldb_search(ldb, msg, &res, msg->dn, LDB_SCOPE_BASE, attrs, NULL);
565         if (ret != LDB_SUCCESS || res->count < 1) {
566                 DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
567                          ldb_dn_get_linearized(msg->dn)));
568                 return LDB_ERR_OPERATIONS_ERROR;
569         }
570                 
571
572         omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
573         if (!omd_value) {
574                 DEBUG(0,(__location__ ": Object %s does not have a replPropertyMetaData attribute\n",
575                          ldb_dn_get_linearized(msg->dn)));
576                 return LDB_ERR_OPERATIONS_ERROR;
577         }
578
579         ndr_err = ndr_pull_struct_blob(omd_value, msg,
580                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
581                                        (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
582         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
583                 DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
584                          ldb_dn_get_linearized(msg->dn)));
585                 return LDB_ERR_OPERATIONS_ERROR;
586         }
587
588         if (omd.version != 1) {
589                 DEBUG(0,(__location__ ": bad version %u in replPropertyMetaData for %s\n",
590                          omd.version, ldb_dn_get_linearized(msg->dn)));
591                 return LDB_ERR_OPERATIONS_ERROR;
592         }
593
594         schema = dsdb_get_schema(ldb);
595
596         for (i=0; i<msg->num_elements; i++) {
597                 ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], &omd, schema, seq_num, 
598                                                  our_invocation_id, now);
599                 if (ret != LDB_SUCCESS) {
600                         return ret;
601                 }
602         }
603
604         /*
605          * replmd_update_rpmd_element has done an update if the
606          * seq_num is set
607          */
608         if (*seq_num != 0) {
609                 struct ldb_val *md_value;
610                 struct ldb_message_element *el;
611
612                 md_value = talloc(msg, struct ldb_val);
613                 if (md_value == NULL) {
614                         ldb_oom(ldb);
615                         return LDB_ERR_OPERATIONS_ERROR;
616                 }
617
618                 ndr_err = ndr_push_struct_blob(md_value, msg, 
619                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
620                                                &omd,
621                                                (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
622                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
623                         DEBUG(0,(__location__ ": Failed to marshall replPropertyMetaData for %s\n",
624                                  ldb_dn_get_linearized(msg->dn)));
625                         return LDB_ERR_OPERATIONS_ERROR;
626                 }
627
628                 ret = ldb_msg_add_empty(msg, "replPropertyMetaData", LDB_FLAG_MOD_REPLACE, &el);
629                 if (ret != LDB_SUCCESS) {
630                         DEBUG(0,(__location__ ": Failed to add updated replPropertyMetaData %s\n",
631                                  ldb_dn_get_linearized(msg->dn)));
632                         return ret;
633                 }
634
635                 el->num_values = 1;
636                 el->values = md_value;
637         }
638
639         return LDB_SUCCESS;     
640 }
641
642
643 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
644 {
645         struct ldb_context *ldb;
646         struct replmd_replicated_request *ac;
647         const struct dsdb_schema *schema;
648         struct ldb_request *down_req;
649         struct ldb_message *msg;
650         int ret;
651         time_t t = time(NULL);
652         uint64_t seq_num = 0;
653
654         /* do not manipulate our control entries */
655         if (ldb_dn_is_special(req->op.mod.message->dn)) {
656                 return ldb_next_request(module, req);
657         }
658
659         ldb = ldb_module_get_ctx(module);
660
661         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
662
663         schema = dsdb_get_schema(ldb);
664         if (!schema) {
665                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
666                               "replmd_modify: no dsdb_schema loaded");
667                 return LDB_ERR_CONSTRAINT_VIOLATION;
668         }
669
670         ac = replmd_ctx_init(module, req);
671         if (!ac) {
672                 return LDB_ERR_OPERATIONS_ERROR;
673         }
674
675         ac->schema = schema;
676
677         /* we have to copy the message as the caller might have it as a const */
678         msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
679         if (msg == NULL) {
680                 talloc_free(ac);
681                 return LDB_ERR_OPERATIONS_ERROR;
682         }
683
684         /* TODO:
685          * - get the whole old object
686          * - if the old object doesn't exist report an error
687          * - give an error when a readonly attribute should
688          *   be modified
689          * - merge the changed into the old object
690          *   if the caller set values to the same value
691          *   ignore the attribute, return success when no
692          *   attribute was changed
693          */
694
695         ret = replmd_update_rpmd(ldb, msg, &seq_num);
696         if (ret != LDB_SUCCESS) {
697                 return ret;
698         }
699
700         /* TODO:
701          * - sort the attributes by attid with replmd_ldb_message_sort()
702          * - replace the old object with the newly constructed one
703          */
704
705         ret = ldb_build_mod_req(&down_req, ldb, ac,
706                                 msg,
707                                 req->controls,
708                                 ac, replmd_op_callback,
709                                 req);
710         if (ret != LDB_SUCCESS) {
711                 return ret;
712         }
713         talloc_steal(down_req, msg);
714
715         /* we only change whenChanged and uSNChanged if the seq_num
716            has changed */
717         if (seq_num != 0) {
718                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
719                         talloc_free(ac);
720                         return LDB_ERR_OPERATIONS_ERROR;
721                 }
722
723                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
724                         talloc_free(ac);
725                         return LDB_ERR_OPERATIONS_ERROR;
726                 }
727         }
728
729         /* go on with the call chain */
730         return ldb_next_request(module, down_req);
731 }
732
733 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
734 {
735         return ret;
736 }
737
738 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
739 {
740         int ret = LDB_ERR_OTHER;
741         /* TODO: do some error mapping */
742         return ret;
743 }
744
745 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
746
747 static int replmd_replicated_apply_add_callback(struct ldb_request *req,
748                                                 struct ldb_reply *ares)
749 {
750         struct ldb_context *ldb;
751         struct replmd_replicated_request *ar = talloc_get_type(req->context,
752                                                struct replmd_replicated_request);
753         int ret;
754
755         ldb = ldb_module_get_ctx(ar->module);
756
757         if (!ares) {
758                 return ldb_module_done(ar->req, NULL, NULL,
759                                         LDB_ERR_OPERATIONS_ERROR);
760         }
761         if (ares->error != LDB_SUCCESS) {
762                 return ldb_module_done(ar->req, ares->controls,
763                                         ares->response, ares->error);
764         }
765
766         if (ares->type != LDB_REPLY_DONE) {
767                 ldb_set_errstring(ldb, "Invalid reply type\n!");
768                 return ldb_module_done(ar->req, NULL, NULL,
769                                         LDB_ERR_OPERATIONS_ERROR);
770         }
771
772         talloc_free(ares);
773         ar->index_current++;
774
775         ret = replmd_replicated_apply_next(ar);
776         if (ret != LDB_SUCCESS) {
777                 return ldb_module_done(ar->req, NULL, NULL, ret);
778         }
779
780         return LDB_SUCCESS;
781 }
782
783 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
784 {
785         struct ldb_context *ldb;
786         struct ldb_request *change_req;
787         enum ndr_err_code ndr_err;
788         struct ldb_message *msg;
789         struct replPropertyMetaDataBlob *md;
790         struct ldb_val md_value;
791         uint32_t i;
792         uint64_t seq_num;
793         int ret;
794
795         /*
796          * TODO: check if the parent object exist
797          */
798
799         /*
800          * TODO: handle the conflict case where an object with the
801          *       same name exist
802          */
803
804         ldb = ldb_module_get_ctx(ar->module);
805         msg = ar->objs->objects[ar->index_current].msg;
806         md = ar->objs->objects[ar->index_current].meta_data;
807
808         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
809         if (ret != LDB_SUCCESS) {
810                 return replmd_replicated_request_error(ar, ret);
811         }
812
813         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
814         if (ret != LDB_SUCCESS) {
815                 return replmd_replicated_request_error(ar, ret);
816         }
817
818         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
819         if (ret != LDB_SUCCESS) {
820                 return replmd_replicated_request_error(ar, ret);
821         }
822
823         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
824         if (ret != LDB_SUCCESS) {
825                 return replmd_replicated_request_error(ar, ret);
826         }
827
828         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
829         if (ret != LDB_SUCCESS) {
830                 return replmd_replicated_request_error(ar, ret);
831         }
832
833         /*
834          * the meta data array is already sorted by the caller
835          */
836         for (i=0; i < md->ctr.ctr1.count; i++) {
837                 md->ctr.ctr1.array[i].local_usn = seq_num;
838         }
839         ndr_err = ndr_push_struct_blob(&md_value, msg, 
840                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
841                                        md,
842                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
843         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
844                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
845                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
846         }
847         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
848         if (ret != LDB_SUCCESS) {
849                 return replmd_replicated_request_error(ar, ret);
850         }
851
852         replmd_ldb_message_sort(msg, ar->schema);
853
854         if (DEBUGLVL(4)) {
855                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
856                 DEBUG(4, ("DRS replication add message:\n%s\n", s));
857                 talloc_free(s);
858         }
859
860         ret = ldb_build_add_req(&change_req,
861                                 ldb,
862                                 ar,
863                                 msg,
864                                 ar->controls,
865                                 ar,
866                                 replmd_replicated_apply_add_callback,
867                                 ar->req);
868         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
869
870         return ldb_next_request(ar->module, change_req);
871 }
872
873 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
874                                                          struct replPropertyMetaData1 *m2)
875 {
876         int ret;
877
878         if (m1->version != m2->version) {
879                 return m1->version - m2->version;
880         }
881
882         if (m1->originating_change_time != m2->originating_change_time) {
883                 return m1->originating_change_time - m2->originating_change_time;
884         }
885
886         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
887         if (ret != 0) {
888                 return ret;
889         }
890
891         return m1->originating_usn - m2->originating_usn;
892 }
893
894 static int replmd_replicated_apply_merge_callback(struct ldb_request *req,
895                                                   struct ldb_reply *ares)
896 {
897         struct ldb_context *ldb;
898         struct replmd_replicated_request *ar = talloc_get_type(req->context,
899                                                struct replmd_replicated_request);
900         int ret;
901
902         ldb = ldb_module_get_ctx(ar->module);
903
904         if (!ares) {
905                 return ldb_module_done(ar->req, NULL, NULL,
906                                         LDB_ERR_OPERATIONS_ERROR);
907         }
908         if (ares->error != LDB_SUCCESS) {
909                 return ldb_module_done(ar->req, ares->controls,
910                                         ares->response, ares->error);
911         }
912
913         if (ares->type != LDB_REPLY_DONE) {
914                 ldb_set_errstring(ldb, "Invalid reply type\n!");
915                 return ldb_module_done(ar->req, NULL, NULL,
916                                         LDB_ERR_OPERATIONS_ERROR);
917         }
918
919         talloc_free(ares);
920         ar->index_current++;
921
922         ret = replmd_replicated_apply_next(ar);
923         if (ret != LDB_SUCCESS) {
924                 return ldb_module_done(ar->req, NULL, NULL, ret);
925         }
926
927         return LDB_SUCCESS;
928 }
929
930 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
931 {
932         struct ldb_context *ldb;
933         struct ldb_request *change_req;
934         enum ndr_err_code ndr_err;
935         struct ldb_message *msg;
936         struct replPropertyMetaDataBlob *rmd;
937         struct replPropertyMetaDataBlob omd;
938         const struct ldb_val *omd_value;
939         struct replPropertyMetaDataBlob nmd;
940         struct ldb_val nmd_value;
941         uint32_t i,j,ni=0;
942         uint32_t removed_attrs = 0;
943         uint64_t seq_num;
944         int ret;
945
946         ldb = ldb_module_get_ctx(ar->module);
947         msg = ar->objs->objects[ar->index_current].msg;
948         rmd = ar->objs->objects[ar->index_current].meta_data;
949         ZERO_STRUCT(omd);
950         omd.version = 1;
951
952         /*
953          * TODO: check repl data is correct after a rename
954          */
955         if (ldb_dn_compare(msg->dn, ar->search_msg->dn) != 0) {
956                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_request rename %s => %s\n",
957                           ldb_dn_get_linearized(ar->search_msg->dn),
958                           ldb_dn_get_linearized(msg->dn));
959                 if (ldb_rename(ldb, ar->search_msg->dn, msg->dn) != LDB_SUCCESS) {
960                         ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_replicated_request rename %s => %s failed - %s\n",
961                                   ldb_dn_get_linearized(ar->search_msg->dn),
962                                   ldb_dn_get_linearized(msg->dn),
963                                   ldb_errstring(ldb));
964                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
965                 }
966         }
967
968         /* find existing meta data */
969         omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
970         if (omd_value) {
971                 ndr_err = ndr_pull_struct_blob(omd_value, ar,
972                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
973                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
974                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
975                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
976                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
977                 }
978
979                 if (omd.version != 1) {
980                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
981                 }
982         }
983
984         ZERO_STRUCT(nmd);
985         nmd.version = 1;
986         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
987         nmd.ctr.ctr1.array = talloc_array(ar,
988                                           struct replPropertyMetaData1,
989                                           nmd.ctr.ctr1.count);
990         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
991
992         /* first copy the old meta data */
993         for (i=0; i < omd.ctr.ctr1.count; i++) {
994                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
995                 ni++;
996         }
997
998         /* now merge in the new meta data */
999         for (i=0; i < rmd->ctr.ctr1.count; i++) {
1000                 bool found = false;
1001
1002                 for (j=0; j < ni; j++) {
1003                         int cmp;
1004
1005                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
1006                                 continue;
1007                         }
1008
1009                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
1010                                                                             &nmd.ctr.ctr1.array[j]);
1011                         if (cmp > 0) {
1012                                 /* replace the entry */
1013                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
1014                                 found = true;
1015                                 break;
1016                         }
1017
1018                         /* we don't want to apply this change so remove the attribute */
1019                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
1020                         removed_attrs++;
1021
1022                         found = true;
1023                         break;
1024                 }
1025
1026                 if (found) continue;
1027
1028                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
1029                 ni++;
1030         }
1031
1032         /*
1033          * finally correct the size of the meta_data array
1034          */
1035         nmd.ctr.ctr1.count = ni;
1036
1037         /*
1038          * the rdn attribute (the alias for the name attribute),
1039          * 'cn' for most objects is the last entry in the meta data array
1040          * we have stored
1041          *
1042          * sort the new meta data array
1043          */
1044         {
1045                 struct replPropertyMetaData1 *rdn_p;
1046                 uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
1047
1048                 rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
1049                 replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_p->attid);
1050         }
1051
1052         /* create the meta data value */
1053         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
1054                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1055                                        &nmd,
1056                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1057         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1058                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1059                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1060         }
1061
1062         /*
1063          * check if some replicated attributes left, otherwise skip the ldb_modify() call
1064          */
1065         if (msg->num_elements == 0) {
1066                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
1067                           ar->index_current);
1068
1069                 ar->index_current++;
1070                 return replmd_replicated_apply_next(ar);
1071         }
1072
1073         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
1074                   ar->index_current, msg->num_elements);
1075
1076         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1077         if (ret != LDB_SUCCESS) {
1078                 return replmd_replicated_request_error(ar, ret);
1079         }
1080
1081         for (i=0; i<ni; i++) {
1082                 nmd.ctr.ctr1.array[i].local_usn = seq_num;
1083         }
1084
1085         /*
1086          * when we know that we'll modify the record, add the whenChanged, uSNChanged
1087          * and replPopertyMetaData attributes
1088          */
1089         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1090         if (ret != LDB_SUCCESS) {
1091                 return replmd_replicated_request_error(ar, ret);
1092         }
1093         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1094         if (ret != LDB_SUCCESS) {
1095                 return replmd_replicated_request_error(ar, ret);
1096         }
1097         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
1098         if (ret != LDB_SUCCESS) {
1099                 return replmd_replicated_request_error(ar, ret);
1100         }
1101
1102         replmd_ldb_message_sort(msg, ar->schema);
1103
1104         /* we want to replace the old values */
1105         for (i=0; i < msg->num_elements; i++) {
1106                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
1107         }
1108
1109         if (DEBUGLVL(4)) {
1110                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1111                 DEBUG(4, ("DRS replication modify message:\n%s\n", s));
1112                 talloc_free(s);
1113         }
1114
1115         ret = ldb_build_mod_req(&change_req,
1116                                 ldb,
1117                                 ar,
1118                                 msg,
1119                                 ar->controls,
1120                                 ar,
1121                                 replmd_replicated_apply_merge_callback,
1122                                 ar->req);
1123         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1124
1125         return ldb_next_request(ar->module, change_req);
1126 }
1127
1128 static int replmd_replicated_apply_search_callback(struct ldb_request *req,
1129                                                    struct ldb_reply *ares)
1130 {
1131         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1132                                                struct replmd_replicated_request);
1133         int ret;
1134
1135         if (!ares) {
1136                 return ldb_module_done(ar->req, NULL, NULL,
1137                                         LDB_ERR_OPERATIONS_ERROR);
1138         }
1139         if (ares->error != LDB_SUCCESS &&
1140             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1141                 return ldb_module_done(ar->req, ares->controls,
1142                                         ares->response, ares->error);
1143         }
1144
1145         switch (ares->type) {
1146         case LDB_REPLY_ENTRY:
1147                 ar->search_msg = talloc_steal(ar, ares->message);
1148                 break;
1149
1150         case LDB_REPLY_REFERRAL:
1151                 /* we ignore referrals */
1152                 break;
1153
1154         case LDB_REPLY_DONE:
1155                 if (ar->search_msg != NULL) {
1156                         ret = replmd_replicated_apply_merge(ar);
1157                 } else {
1158                         ret = replmd_replicated_apply_add(ar);
1159                 }
1160                 if (ret != LDB_SUCCESS) {
1161                         return ldb_module_done(ar->req, NULL, NULL, ret);
1162                 }
1163         }
1164
1165         talloc_free(ares);
1166         return LDB_SUCCESS;
1167 }
1168
1169 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
1170
1171 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
1172 {
1173         struct ldb_context *ldb;
1174         int ret;
1175         char *tmp_str;
1176         char *filter;
1177         struct ldb_request *search_req;
1178
1179         if (ar->index_current >= ar->objs->num_objects) {
1180                 /* done with it, go to next stage */
1181                 return replmd_replicated_uptodate_vector(ar);
1182         }
1183
1184         ldb = ldb_module_get_ctx(ar->module);
1185         ar->search_msg = NULL;
1186
1187         tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].guid_value);
1188         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1189
1190         filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
1191         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1192         talloc_free(tmp_str);
1193
1194         ret = ldb_build_search_req(&search_req,
1195                                    ldb,
1196                                    ar,
1197                                    ar->objs->partition_dn,
1198                                    LDB_SCOPE_SUBTREE,
1199                                    filter,
1200                                    NULL,
1201                                    NULL,
1202                                    ar,
1203                                    replmd_replicated_apply_search_callback,
1204                                    ar->req);
1205         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1206
1207         return ldb_next_request(ar->module, search_req);
1208 }
1209
1210 static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
1211                                                       struct ldb_reply *ares)
1212 {
1213         struct ldb_context *ldb;
1214         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1215                                                struct replmd_replicated_request);
1216         ldb = ldb_module_get_ctx(ar->module);
1217
1218         if (!ares) {
1219                 return ldb_module_done(ar->req, NULL, NULL,
1220                                         LDB_ERR_OPERATIONS_ERROR);
1221         }
1222         if (ares->error != LDB_SUCCESS) {
1223                 return ldb_module_done(ar->req, ares->controls,
1224                                         ares->response, ares->error);
1225         }
1226
1227         if (ares->type != LDB_REPLY_DONE) {
1228                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1229                 return ldb_module_done(ar->req, NULL, NULL,
1230                                         LDB_ERR_OPERATIONS_ERROR);
1231         }
1232
1233         talloc_free(ares);
1234
1235         return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
1236 }
1237
1238 static int replmd_drsuapi_DsReplicaCursor2_compare(const struct drsuapi_DsReplicaCursor2 *c1,
1239                                                    const struct drsuapi_DsReplicaCursor2 *c2)
1240 {
1241         return GUID_compare(&c1->source_dsa_invocation_id, &c2->source_dsa_invocation_id);
1242 }
1243
1244 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1245 {
1246         struct ldb_context *ldb;
1247         struct ldb_request *change_req;
1248         enum ndr_err_code ndr_err;
1249         struct ldb_message *msg;
1250         struct replUpToDateVectorBlob ouv;
1251         const struct ldb_val *ouv_value;
1252         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1253         struct replUpToDateVectorBlob nuv;
1254         struct ldb_val nuv_value;
1255         struct ldb_message_element *nuv_el = NULL;
1256         const struct GUID *our_invocation_id;
1257         struct ldb_message_element *orf_el = NULL;
1258         struct repsFromToBlob nrf;
1259         struct ldb_val *nrf_value = NULL;
1260         struct ldb_message_element *nrf_el = NULL;
1261         uint32_t i,j,ni=0;
1262         bool found = false;
1263         time_t t = time(NULL);
1264         NTTIME now;
1265         int ret;
1266
1267         ldb = ldb_module_get_ctx(ar->module);
1268         ruv = ar->objs->uptodateness_vector;
1269         ZERO_STRUCT(ouv);
1270         ouv.version = 2;
1271         ZERO_STRUCT(nuv);
1272         nuv.version = 2;
1273
1274         unix_to_nt_time(&now, t);
1275
1276         /*
1277          * first create the new replUpToDateVector
1278          */
1279         ouv_value = ldb_msg_find_ldb_val(ar->search_msg, "replUpToDateVector");
1280         if (ouv_value) {
1281                 ndr_err = ndr_pull_struct_blob(ouv_value, ar,
1282                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &ouv,
1283                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1284                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1285                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1286                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1287                 }
1288
1289                 if (ouv.version != 2) {
1290                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1291                 }
1292         }
1293
1294         /*
1295          * the new uptodateness vector will at least
1296          * contain 1 entry, one for the source_dsa
1297          *
1298          * plus optional values from our old vector and the one from the source_dsa
1299          */
1300         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
1301         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1302         nuv.ctr.ctr2.cursors = talloc_array(ar,
1303                                             struct drsuapi_DsReplicaCursor2,
1304                                             nuv.ctr.ctr2.count);
1305         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1306
1307         /* first copy the old vector */
1308         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1309                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1310                 ni++;
1311         }
1312
1313         /* get our invocation_id if we have one already attached to the ldb */
1314         our_invocation_id = samdb_ntds_invocation_id(ldb);
1315
1316         /* merge in the source_dsa vector is available */
1317         for (i=0; (ruv && i < ruv->count); i++) {
1318                 found = false;
1319
1320                 if (our_invocation_id &&
1321                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1322                                our_invocation_id)) {
1323                         continue;
1324                 }
1325
1326                 for (j=0; j < ni; j++) {
1327                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1328                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1329                                 continue;
1330                         }
1331
1332                         found = true;
1333
1334                         /*
1335                          * we update only the highest_usn and not the latest_sync_success time,
1336                          * because the last success stands for direct replication
1337                          */
1338                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1339                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1340                         }
1341                         break;                  
1342                 }
1343
1344                 if (found) continue;
1345
1346                 /* if it's not there yet, add it */
1347                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1348                 ni++;
1349         }
1350
1351         /*
1352          * merge in the current highwatermark for the source_dsa
1353          */
1354         found = false;
1355         for (j=0; j < ni; j++) {
1356                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1357                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1358                         continue;
1359                 }
1360
1361                 found = true;
1362
1363                 /*
1364                  * here we update the highest_usn and last_sync_success time
1365                  * because we're directly replicating from the source_dsa
1366                  *
1367                  * and use the tmp_highest_usn because this is what we have just applied
1368                  * to our ldb
1369                  */
1370                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1371                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1372                 break;
1373         }
1374         if (!found) {
1375                 /*
1376                  * here we update the highest_usn and last_sync_success time
1377                  * because we're directly replicating from the source_dsa
1378                  *
1379                  * and use the tmp_highest_usn because this is what we have just applied
1380                  * to our ldb
1381                  */
1382                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1383                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1384                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1385                 ni++;
1386         }
1387
1388         /*
1389          * finally correct the size of the cursors array
1390          */
1391         nuv.ctr.ctr2.count = ni;
1392
1393         /*
1394          * sort the cursors
1395          */
1396         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1397               sizeof(struct drsuapi_DsReplicaCursor2),
1398               (comparison_fn_t)replmd_drsuapi_DsReplicaCursor2_compare);
1399
1400         /*
1401          * create the change ldb_message
1402          */
1403         msg = ldb_msg_new(ar);
1404         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1405         msg->dn = ar->search_msg->dn;
1406
1407         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
1408                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1409                                        &nuv,
1410                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1411         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1412                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1413                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1414         }
1415         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1416         if (ret != LDB_SUCCESS) {
1417                 return replmd_replicated_request_error(ar, ret);
1418         }
1419         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1420
1421         /*
1422          * now create the new repsFrom value from the given repsFromTo1 structure
1423          */
1424         ZERO_STRUCT(nrf);
1425         nrf.version                                     = 1;
1426         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1427         /* and fix some values... */
1428         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1429         nrf.ctr.ctr1.last_success                       = now;
1430         nrf.ctr.ctr1.last_attempt                       = now;
1431         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1432         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1433
1434         /*
1435          * first see if we already have a repsFrom value for the current source dsa
1436          * if so we'll later replace this value
1437          */
1438         orf_el = ldb_msg_find_element(ar->search_msg, "repsFrom");
1439         if (orf_el) {
1440                 for (i=0; i < orf_el->num_values; i++) {
1441                         struct repsFromToBlob *trf;
1442
1443                         trf = talloc(ar, struct repsFromToBlob);
1444                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1445
1446                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), trf,
1447                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1448                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1449                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1450                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1451                         }
1452
1453                         if (trf->version != 1) {
1454                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1455                         }
1456
1457                         /*
1458                          * we compare the source dsa objectGUID not the invocation_id
1459                          * because we want only one repsFrom value per source dsa
1460                          * and when the invocation_id of the source dsa has changed we don't need 
1461                          * the old repsFrom with the old invocation_id
1462                          */
1463                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1464                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1465                                 talloc_free(trf);
1466                                 continue;
1467                         }
1468
1469                         talloc_free(trf);
1470                         nrf_value = &orf_el->values[i];
1471                         break;
1472                 }
1473
1474                 /*
1475                  * copy over all old values to the new ldb_message
1476                  */
1477                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1478                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1479                 *nrf_el = *orf_el;
1480         }
1481
1482         /*
1483          * if we haven't found an old repsFrom value for the current source dsa
1484          * we'll add a new value
1485          */
1486         if (!nrf_value) {
1487                 struct ldb_val zero_value;
1488                 ZERO_STRUCT(zero_value);
1489                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1490                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1491
1492                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1493         }
1494
1495         /* we now fill the value which is already attached to ldb_message */
1496         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
1497                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1498                                        &nrf,
1499                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1500         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1501                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1502                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1503         }
1504
1505         /* 
1506          * the ldb_message_element for the attribute, has all the old values and the new one
1507          * so we'll replace the whole attribute with all values
1508          */
1509         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1510
1511         if (DEBUGLVL(4)) {
1512                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1513                 DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
1514                 talloc_free(s);
1515         }
1516
1517         /* prepare the ldb_modify() request */
1518         ret = ldb_build_mod_req(&change_req,
1519                                 ldb,
1520                                 ar,
1521                                 msg,
1522                                 ar->controls,
1523                                 ar,
1524                                 replmd_replicated_uptodate_modify_callback,
1525                                 ar->req);
1526         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1527
1528         return ldb_next_request(ar->module, change_req);
1529 }
1530
1531 static int replmd_replicated_uptodate_search_callback(struct ldb_request *req,
1532                                                       struct ldb_reply *ares)
1533 {
1534         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1535                                                struct replmd_replicated_request);
1536         int ret;
1537
1538         if (!ares) {
1539                 return ldb_module_done(ar->req, NULL, NULL,
1540                                         LDB_ERR_OPERATIONS_ERROR);
1541         }
1542         if (ares->error != LDB_SUCCESS &&
1543             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1544                 return ldb_module_done(ar->req, ares->controls,
1545                                         ares->response, ares->error);
1546         }
1547
1548         switch (ares->type) {
1549         case LDB_REPLY_ENTRY:
1550                 ar->search_msg = talloc_steal(ar, ares->message);
1551                 break;
1552
1553         case LDB_REPLY_REFERRAL:
1554                 /* we ignore referrals */
1555                 break;
1556
1557         case LDB_REPLY_DONE:
1558                 if (ar->search_msg == NULL) {
1559                         ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1560                 } else {
1561                         ret = replmd_replicated_uptodate_modify(ar);
1562                 }
1563                 if (ret != LDB_SUCCESS) {
1564                         return ldb_module_done(ar->req, NULL, NULL, ret);
1565                 }
1566         }
1567
1568         talloc_free(ares);
1569         return LDB_SUCCESS;
1570 }
1571
1572
1573 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1574 {
1575         struct ldb_context *ldb;
1576         int ret;
1577         static const char *attrs[] = {
1578                 "replUpToDateVector",
1579                 "repsFrom",
1580                 NULL
1581         };
1582         struct ldb_request *search_req;
1583
1584         ldb = ldb_module_get_ctx(ar->module);
1585         ar->search_msg = NULL;
1586
1587         ret = ldb_build_search_req(&search_req,
1588                                    ldb,
1589                                    ar,
1590                                    ar->objs->partition_dn,
1591                                    LDB_SCOPE_BASE,
1592                                    "(objectClass=*)",
1593                                    attrs,
1594                                    NULL,
1595                                    ar,
1596                                    replmd_replicated_uptodate_search_callback,
1597                                    ar->req);
1598         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1599
1600         return ldb_next_request(ar->module, search_req);
1601 }
1602
1603
1604
1605 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1606 {
1607         struct ldb_context *ldb;
1608         struct dsdb_extended_replicated_objects *objs;
1609         struct replmd_replicated_request *ar;
1610         struct ldb_control **ctrls;
1611         int ret, i;
1612         struct dsdb_control_current_partition *partition_ctrl;
1613         struct replmd_private *replmd_private = 
1614                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1615
1616         ldb = ldb_module_get_ctx(module);
1617
1618         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1619
1620         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1621         if (!objs) {
1622                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1623                 return LDB_ERR_PROTOCOL_ERROR;
1624         }
1625
1626         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1627                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1628                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1629                 return LDB_ERR_PROTOCOL_ERROR;
1630         }
1631
1632         ar = replmd_ctx_init(module, req);
1633         if (!ar)
1634                 return LDB_ERR_OPERATIONS_ERROR;
1635
1636         ar->objs = objs;
1637         ar->schema = dsdb_get_schema(ldb);
1638         if (!ar->schema) {
1639                 ldb_debug_set(ldb, LDB_DEBUG_FATAL, "replmd_ctx_init: no loaded schema found\n");
1640                 talloc_free(ar);
1641                 return LDB_ERR_CONSTRAINT_VIOLATION;
1642         }
1643
1644         ctrls = req->controls;
1645
1646         if (req->controls) {
1647                 req->controls = talloc_memdup(ar, req->controls,
1648                                               talloc_get_size(req->controls));
1649                 if (!req->controls) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1650         }
1651
1652         ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, NULL);
1653         if (ret != LDB_SUCCESS) {
1654                 return ret;
1655         }
1656
1657         /*
1658           add the DSDB_CONTROL_CURRENT_PARTITION_OID control. This
1659           tells the partition module which partition this request is
1660           directed at. That is important as the partition roots appear
1661           twice in the directory, once as mount points in the top
1662           level store, and once as the roots of each partition. The
1663           replication code wants to operate on the root of the
1664           partitions, not the top level mount points
1665          */
1666         partition_ctrl = talloc(req, struct dsdb_control_current_partition);
1667         if (partition_ctrl == NULL) {
1668                 if (!partition_ctrl) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1669         }
1670         partition_ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
1671         partition_ctrl->dn = objs->partition_dn;
1672
1673         ret = ldb_request_add_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID, false, partition_ctrl);
1674         if (ret != LDB_SUCCESS) {
1675                 return ret;
1676         }
1677
1678         ar->controls = req->controls;
1679         req->controls = ctrls;
1680
1681         DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
1682
1683         /* save away the linked attributes for the end of the
1684            transaction */
1685         for (i=0; i<ar->objs->linked_attributes_count; i++) {
1686                 struct la_entry *la_entry;
1687
1688                 if (replmd_private == NULL) {
1689                         DEBUG(0,(__location__ ": repl_meta_data not called from within a transaction\n"));
1690                         return LDB_ERR_OPERATIONS_ERROR;
1691                 }
1692
1693                 la_entry = talloc(replmd_private, struct la_entry);
1694                 if (la_entry == NULL) {
1695                         ldb_oom(ldb);
1696                         return LDB_ERR_OPERATIONS_ERROR;
1697                 }
1698                 la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
1699                 if (la_entry->la == NULL) {
1700                         ldb_oom(ldb);
1701                         return LDB_ERR_OPERATIONS_ERROR;
1702                 }
1703                 *la_entry->la = ar->objs->linked_attributes[i];
1704
1705                 /* we need to steal the non-scalars so they stay
1706                    around until the end of the transaction */
1707                 talloc_steal(la_entry->la, la_entry->la->identifier);
1708                 talloc_steal(la_entry->la, la_entry->la->value.blob);
1709
1710                 DLIST_ADD(replmd_private->la_list, la_entry);
1711         }
1712
1713         return replmd_replicated_apply_next(ar);
1714 }
1715
1716 /*
1717   process one linked attribute structure
1718  */
1719 static int replmd_process_linked_attribute(struct ldb_module *module,
1720                                            struct la_entry *la_entry)
1721 {                                          
1722         struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
1723         struct ldb_context *ldb = ldb_module_get_ctx(module);
1724         struct drsuapi_DsReplicaObjectIdentifier3 target;
1725         struct ldb_message *msg;
1726         struct ldb_message_element *ret_el;
1727         TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
1728         enum ndr_err_code ndr_err;
1729         char *target_dn;
1730         struct ldb_request *mod_req;
1731         int ret;
1732         const struct dsdb_attribute *attr;
1733
1734 /*
1735 linked_attributes[0]:                                                     
1736      &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute 
1737         identifier               : *                                      
1738             identifier: struct drsuapi_DsReplicaObjectIdentifier          
1739                 __ndr_size               : 0x0000003a (58)                
1740                 __ndr_size_sid           : 0x00000000 (0)                 
1741                 guid                     : 8e95b6a9-13dd-4158-89db-3220a5be5cc7
1742                 sid                      : S-0-0                               
1743                 __ndr_size_dn            : 0x00000000 (0)                      
1744                 dn                       : ''                                  
1745         attid                    : DRSUAPI_ATTRIBUTE_member (0x1F)             
1746         value: struct drsuapi_DsAttributeValue                                 
1747             __ndr_size               : 0x0000007e (126)                        
1748             blob                     : *                                       
1749                 blob                     : DATA_BLOB length=126                
1750         flags                    : 0x00000001 (1)                              
1751                1: DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE                      
1752         originating_add_time     : Wed Sep  2 22:20:01 2009 EST                
1753         meta_data: struct drsuapi_DsReplicaMetaData                            
1754             version                  : 0x00000015 (21)                         
1755             originating_change_time  : Wed Sep  2 23:39:07 2009 EST            
1756             originating_invocation_id: 794640f3-18cf-40ee-a211-a93992b67a64    
1757             originating_usn          : 0x000000000001e19c (123292)             
1758      &target: struct drsuapi_DsReplicaObjectIdentifier3                        
1759         __ndr_size               : 0x0000007e (126)                            
1760         __ndr_size_sid           : 0x0000001c (28)                             
1761         guid                     : 7639e594-db75-4086-b0d4-67890ae46031        
1762         sid                      : S-1-5-21-2848215498-2472035911-1947525656-19924
1763         __ndr_size_dn            : 0x00000022 (34)                                
1764         dn                       : 'CN=UOne,OU=TestOU,DC=vsofs8,DC=com'           
1765  */
1766         if (DEBUGLVL(4)) {
1767                 NDR_PRINT_DEBUG(drsuapi_DsReplicaLinkedAttribute, la); 
1768         }
1769         
1770         /* decode the target of the link */
1771         ndr_err = ndr_pull_struct_blob(la->value.blob, 
1772                                        tmp_ctx, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1773                                        &target,
1774                                        (ndr_pull_flags_fn_t)ndr_pull_drsuapi_DsReplicaObjectIdentifier3);
1775         if (ndr_err != NDR_ERR_SUCCESS) {
1776                 DEBUG(0,("Unable to decode linked_attribute target\n"));
1777                 dump_data(4, la->value.blob->data, la->value.blob->length);                     
1778                 talloc_free(tmp_ctx);
1779                 return LDB_ERR_OPERATIONS_ERROR;
1780         }
1781         if (DEBUGLVL(4)) {
1782                 NDR_PRINT_DEBUG(drsuapi_DsReplicaObjectIdentifier3, &target);
1783         }
1784
1785         /* construct a modify request for this attribute change */
1786         msg = ldb_msg_new(tmp_ctx);
1787         if (!msg) {
1788                 ldb_oom(ldb);
1789                 talloc_free(tmp_ctx);
1790                 return LDB_ERR_OPERATIONS_ERROR;
1791         }
1792
1793         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, 
1794                                    GUID_string(tmp_ctx, &la->identifier->guid), &msg->dn);
1795         if (ret != LDB_SUCCESS) {
1796                 talloc_free(tmp_ctx);
1797                 return ret;
1798         }
1799
1800         /* find the attribute being modified */
1801         attr = dsdb_attribute_by_attributeID_id(dsdb_get_schema(ldb), la->attid);
1802         if (attr == NULL) {
1803                 DEBUG(0, (__location__ ": Unable to find attributeID 0x%x\n", la->attid));
1804                 talloc_free(tmp_ctx);
1805                 return LDB_ERR_OPERATIONS_ERROR;
1806         }
1807
1808         if (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) {
1809                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
1810                                         LDB_FLAG_MOD_ADD, &ret_el);
1811         } else {
1812                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
1813                                         LDB_FLAG_MOD_DELETE, &ret_el);
1814         }
1815         if (ret != LDB_SUCCESS) {
1816                 talloc_free(tmp_ctx);
1817                 return ret;
1818         }
1819         ret_el->values = talloc_array(msg, struct ldb_val, 1);
1820         if (!ret_el->values) {
1821                 ldb_oom(ldb);
1822                 talloc_free(tmp_ctx);
1823                 return LDB_ERR_OPERATIONS_ERROR;
1824         }
1825         ret_el->num_values = 1;
1826
1827         target_dn = talloc_asprintf(tmp_ctx, "<GUID=%s>;<SID=%s>;%s",
1828                                     GUID_string(tmp_ctx, &target.guid),
1829                                     dom_sid_string(tmp_ctx, &target.sid),
1830                                     target.dn);
1831         if (target_dn == NULL) {
1832                 ldb_oom(ldb);
1833                 talloc_free(tmp_ctx);
1834                 return LDB_ERR_OPERATIONS_ERROR;
1835         }
1836         ret_el->values[0] = data_blob_string_const(target_dn);
1837
1838         ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
1839                                 msg,
1840                                 NULL,
1841                                 NULL, 
1842                                 ldb_op_default_callback,
1843                                 NULL);
1844         if (ret != LDB_SUCCESS) {
1845                 talloc_free(tmp_ctx);
1846                 return ret;
1847         }
1848         talloc_steal(mod_req, msg);
1849
1850         if (DEBUGLVL(4)) {
1851                 DEBUG(4,("Applying DRS linked attribute change:\n%s\n",
1852                          ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg)));
1853         }
1854
1855         /* Run the new request */
1856         ret = ldb_next_request(module, mod_req);
1857
1858         /* we need to wait for this to finish, as we are being called
1859            from the synchronous end_transaction hook of this module */
1860         if (ret == LDB_SUCCESS) {
1861                 ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
1862         }
1863
1864         if (ret != LDB_SUCCESS) {
1865                 ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s' %s\n",
1866                           ldb_errstring(ldb),
1867                           ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
1868         }
1869         
1870         talloc_free(tmp_ctx);
1871
1872         return ret;     
1873 }
1874
1875 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
1876 {
1877         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
1878                 return replmd_extended_replicated_objects(module, req);
1879         }
1880
1881         return ldb_next_request(module, req);
1882 }
1883
1884
1885 /*
1886   we hook into the transaction operations to allow us to 
1887   perform the linked attribute updates at the end of the whole
1888   transaction. This allows a forward linked attribute to be created
1889   before the object is created. During a vampire, w2k8 sends us linked
1890   attributes before the objects they are part of.
1891  */
1892 static int replmd_start_transaction(struct ldb_module *module)
1893 {
1894         /* create our private structure for this transaction */
1895         struct replmd_private *replmd_private = talloc_get_type(ldb_module_get_private(module),
1896                                                                 struct replmd_private);
1897         talloc_free(replmd_private);
1898         replmd_private = talloc(module, struct replmd_private);
1899         if (replmd_private == NULL) {
1900                 return LDB_ERR_OPERATIONS_ERROR;
1901         }
1902         replmd_private->la_list = NULL;
1903         ldb_module_set_private(module, replmd_private);
1904         return ldb_next_start_trans(module);
1905 }
1906
1907 /*
1908   on prepare commit we loop over our queued la_context structures and
1909   apply each of them  
1910  */
1911 static int replmd_prepare_commit(struct ldb_module *module)
1912 {
1913         struct replmd_private *replmd_private = 
1914                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1915         struct la_entry *la;
1916
1917         /* walk the list backwards, to do the first entry first, as we
1918          * added the entries with DLIST_ADD() which puts them at the
1919          * start of the list */
1920         for (la = replmd_private->la_list; la && la->next; la=la->next) ;
1921
1922         for (; la; la=la->prev) {
1923                 int ret;
1924                 ret = replmd_process_linked_attribute(module, la);
1925                 if (ret != LDB_SUCCESS) {
1926                         return ret;
1927                 }
1928         }
1929
1930         talloc_free(replmd_private);
1931         ldb_module_set_private(module, NULL);
1932         
1933         return ldb_next_prepare_commit(module);
1934 }
1935
1936 static int replmd_del_transaction(struct ldb_module *module)
1937 {
1938         struct replmd_private *replmd_private = 
1939                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1940         talloc_free(replmd_private);
1941         ldb_module_set_private(module, NULL);
1942         return ldb_next_del_trans(module);
1943 }
1944
1945
1946 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
1947         .name          = "repl_meta_data",
1948         .add           = replmd_add,
1949         .modify        = replmd_modify,
1950         .extended      = replmd_extended,
1951         .start_transaction = replmd_start_transaction,
1952         .prepare_commit    = replmd_prepare_commit,
1953         .del_transaction   = replmd_del_transaction,
1954 };