2a16c2bb82355aa5e149dc6a7cc7263e07f04238
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /*
24  *  Name: ldb
25  *
26  *  Component: ldb repl_meta_data module
27  *
28  *  Description: - add a unique objectGUID onto every new record,
29  *               - handle whenCreated, whenChanged timestamps
30  *               - handle uSNCreated, uSNChanged numbers
31  *               - handle replPropertyMetaData attribute
32  *
33  *  Author: Simo Sorce
34  *  Author: Stefan Metzmacher
35  */
36
37 #include "includes.h"
38 #include "ldb_module.h"
39 #include "dsdb/samdb/samdb.h"
40 #include "dsdb/common/proto.h"
41 #include "../libds/common/flags.h"
42 #include "librpc/gen_ndr/ndr_misc.h"
43 #include "librpc/gen_ndr/ndr_drsuapi.h"
44 #include "librpc/gen_ndr/ndr_drsblobs.h"
45 #include "param/param.h"
46 #include "libcli/security/dom_sid.h"
47 #include "lib/util/dlinklist.h"
48
49 struct replmd_private {
50         struct la_entry *la_list;
51 };
52
53 struct la_entry {
54         struct la_entry *next, *prev;
55         struct drsuapi_DsReplicaLinkedAttribute *la;
56 };
57
58 struct replmd_replicated_request {
59         struct ldb_module *module;
60         struct ldb_request *req;
61
62         const struct dsdb_schema *schema;
63
64         struct dsdb_extended_replicated_objects *objs;
65
66         /* the controls we pass down */
67         struct ldb_control **controls;
68
69         uint32_t index_current;
70
71         struct ldb_message *search_msg;
72 };
73
74 /*
75   created a replmd_replicated_request context
76  */
77 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
78                                                          struct ldb_request *req)
79 {
80         struct ldb_context *ldb;
81         struct replmd_replicated_request *ac;
82
83         ldb = ldb_module_get_ctx(module);
84
85         ac = talloc_zero(req, struct replmd_replicated_request);
86         if (ac == NULL) {
87                 ldb_oom(ldb);
88                 return NULL;
89         }
90
91         ac->module = module;
92         ac->req = req;
93         return ac;
94 }
95
96 /*
97   add a time element to a record
98 */
99 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
100 {
101         struct ldb_message_element *el;
102         char *s;
103
104         if (ldb_msg_find_element(msg, attr) != NULL) {
105                 return LDB_SUCCESS;
106         }
107
108         s = ldb_timestring(msg, t);
109         if (s == NULL) {
110                 return LDB_ERR_OPERATIONS_ERROR;
111         }
112
113         if (ldb_msg_add_string(msg, attr, s) != LDB_SUCCESS) {
114                 return LDB_ERR_OPERATIONS_ERROR;
115         }
116
117         el = ldb_msg_find_element(msg, attr);
118         /* always set as replace. This works because on add ops, the flag
119            is ignored */
120         el->flags = LDB_FLAG_MOD_REPLACE;
121
122         return LDB_SUCCESS;
123 }
124
125 /*
126   add a uint64_t element to a record
127 */
128 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
129 {
130         struct ldb_message_element *el;
131
132         if (ldb_msg_find_element(msg, attr) != NULL) {
133                 return LDB_SUCCESS;
134         }
135
136         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != LDB_SUCCESS) {
137                 return LDB_ERR_OPERATIONS_ERROR;
138         }
139
140         el = ldb_msg_find_element(msg, attr);
141         /* always set as replace. This works because on add ops, the flag
142            is ignored */
143         el->flags = LDB_FLAG_MOD_REPLACE;
144
145         return LDB_SUCCESS;
146 }
147
148 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
149                                                    const struct replPropertyMetaData1 *m2,
150                                                    const uint32_t *rdn_attid)
151 {
152         if (m1->attid == m2->attid) {
153                 return 0;
154         }
155
156         /*
157          * the rdn attribute should be at the end!
158          * so we need to return a value greater than zero
159          * which means m1 is greater than m2
160          */
161         if (m1->attid == *rdn_attid) {
162                 return 1;
163         }
164
165         /*
166          * the rdn attribute should be at the end!
167          * so we need to return a value less than zero
168          * which means m2 is greater than m1
169          */
170         if (m2->attid == *rdn_attid) {
171                 return -1;
172         }
173
174         return m1->attid - m2->attid;
175 }
176
177 static void replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
178                                                  const uint32_t *rdn_attid)
179 {
180         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
181                   discard_const_p(void, rdn_attid), (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
182 }
183
184 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
185                                                  const struct ldb_message_element *e2,
186                                                  const struct dsdb_schema *schema)
187 {
188         const struct dsdb_attribute *a1;
189         const struct dsdb_attribute *a2;
190
191         /* 
192          * TODO: make this faster by caching the dsdb_attribute pointer
193          *       on the ldb_messag_element
194          */
195
196         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
197         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
198
199         /*
200          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
201          *       in the schema
202          */
203         if (!a1 || !a2) {
204                 return strcasecmp(e1->name, e2->name);
205         }
206
207         return a1->attributeID_id - a2->attributeID_id;
208 }
209
210 static void replmd_ldb_message_sort(struct ldb_message *msg,
211                                     const struct dsdb_schema *schema)
212 {
213         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
214                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
215 }
216
217 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
218 {
219         struct ldb_context *ldb;
220         struct replmd_replicated_request *ac;
221
222         ac = talloc_get_type(req->context, struct replmd_replicated_request);
223         ldb = ldb_module_get_ctx(ac->module);
224
225         if (!ares) {
226                 return ldb_module_done(ac->req, NULL, NULL,
227                                         LDB_ERR_OPERATIONS_ERROR);
228         }
229         if (ares->error != LDB_SUCCESS) {
230                 return ldb_module_done(ac->req, ares->controls,
231                                         ares->response, ares->error);
232         }
233
234         if (ares->type != LDB_REPLY_DONE) {
235                 ldb_set_errstring(ldb,
236                                   "invalid ldb_reply_type in callback");
237                 talloc_free(ares);
238                 return ldb_module_done(ac->req, NULL, NULL,
239                                         LDB_ERR_OPERATIONS_ERROR);
240         }
241
242         return ldb_module_done(ac->req, ares->controls,
243                                 ares->response, LDB_SUCCESS);
244 }
245
246 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
247 {
248         struct ldb_context *ldb;
249         struct replmd_replicated_request *ac;
250         const struct dsdb_schema *schema;
251         enum ndr_err_code ndr_err;
252         struct ldb_request *down_req;
253         struct ldb_message *msg;
254         const struct dsdb_attribute *rdn_attr = NULL;
255         struct GUID guid;
256         struct ldb_val guid_value;
257         struct replPropertyMetaDataBlob nmd;
258         struct ldb_val nmd_value;
259         uint64_t seq_num;
260         const struct GUID *our_invocation_id;
261         time_t t = time(NULL);
262         NTTIME now;
263         char *time_str;
264         int ret;
265         uint32_t i, ni=0;
266
267         /* do not manipulate our control entries */
268         if (ldb_dn_is_special(req->op.add.message->dn)) {
269                 return ldb_next_request(module, req);
270         }
271
272         ldb = ldb_module_get_ctx(module);
273
274         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
275
276         schema = dsdb_get_schema(ldb);
277         if (!schema) {
278                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
279                               "replmd_add: no dsdb_schema loaded");
280                 return LDB_ERR_CONSTRAINT_VIOLATION;
281         }
282
283         ac = replmd_ctx_init(module, req);
284         if (!ac) {
285                 return LDB_ERR_OPERATIONS_ERROR;
286         }
287
288         ac->schema = schema;
289
290         if (ldb_msg_find_element(req->op.add.message, "objectGUID") != NULL) {
291                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
292                               "replmd_add: it's not allowed to add an object with objectGUID\n");
293                 return LDB_ERR_UNWILLING_TO_PERFORM;
294         }
295
296         /* Get a sequence number from the backend */
297         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
298         if (ret != LDB_SUCCESS) {
299                 return ret;
300         }
301
302         /* a new GUID */
303         guid = GUID_random();
304
305         /* get our invocationId */
306         our_invocation_id = samdb_ntds_invocation_id(ldb);
307         if (!our_invocation_id) {
308                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
309                               "replmd_add: unable to find invocationId\n");
310                 return LDB_ERR_OPERATIONS_ERROR;
311         }
312
313         /* we have to copy the message as the caller might have it as a const */
314         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
315         if (msg == NULL) {
316                 ldb_oom(ldb);
317                 return LDB_ERR_OPERATIONS_ERROR;
318         }
319
320         /* generated times */
321         unix_to_nt_time(&now, t);
322         time_str = ldb_timestring(msg, t);
323         if (!time_str) {
324                 return LDB_ERR_OPERATIONS_ERROR;
325         }
326
327         /* 
328          * remove autogenerated attributes
329          */
330         ldb_msg_remove_attr(msg, "whenCreated");
331         ldb_msg_remove_attr(msg, "whenChanged");
332         ldb_msg_remove_attr(msg, "uSNCreated");
333         ldb_msg_remove_attr(msg, "uSNChanged");
334         ldb_msg_remove_attr(msg, "replPropertyMetaData");
335
336         /*
337          * readd replicated attributes
338          */
339         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
340         if (ret != LDB_SUCCESS) {
341                 ldb_oom(ldb);
342                 return LDB_ERR_OPERATIONS_ERROR;
343         }
344
345         /* build the replication meta_data */
346         ZERO_STRUCT(nmd);
347         nmd.version             = 1;
348         nmd.ctr.ctr1.count      = msg->num_elements;
349         nmd.ctr.ctr1.array      = talloc_array(msg,
350                                                struct replPropertyMetaData1,
351                                                nmd.ctr.ctr1.count);
352         if (!nmd.ctr.ctr1.array) {
353                 ldb_oom(ldb);
354                 return LDB_ERR_OPERATIONS_ERROR;
355         }
356
357         for (i=0; i < msg->num_elements; i++) {
358                 struct ldb_message_element *e = &msg->elements[i];
359                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
360                 const struct dsdb_attribute *sa;
361
362                 if (e->name[0] == '@') continue;
363
364                 sa = dsdb_attribute_by_lDAPDisplayName(schema, e->name);
365                 if (!sa) {
366                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
367                                       "replmd_add: attribute '%s' not defined in schema\n",
368                                       e->name);
369                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
370                 }
371
372                 if ((sa->systemFlags & 0x00000001) || (sa->systemFlags & 0x00000004)) {
373                         /* if the attribute is not replicated (0x00000001)
374                          * or constructed (0x00000004) it has no metadata
375                          */
376                         continue;
377                 }
378
379                 m->attid                        = sa->attributeID_id;
380                 m->version                      = 1;
381                 m->originating_change_time      = now;
382                 m->originating_invocation_id    = *our_invocation_id;
383                 m->originating_usn              = seq_num;
384                 m->local_usn                    = seq_num;
385                 ni++;
386
387                 if (ldb_attr_cmp(e->name, ldb_dn_get_rdn_name(msg->dn))) {
388                         rdn_attr = sa;
389                 }
390         }
391
392         /* fix meta data count */
393         nmd.ctr.ctr1.count = ni;
394
395         /*
396          * sort meta data array, and move the rdn attribute entry to the end
397          */
398         replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_attr->attributeID_id);
399
400         /* generated NDR encoded values */
401         ndr_err = ndr_push_struct_blob(&guid_value, msg, 
402                                        NULL,
403                                        &guid,
404                                        (ndr_push_flags_fn_t)ndr_push_GUID);
405         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
406                 ldb_oom(ldb);
407                 return LDB_ERR_OPERATIONS_ERROR;
408         }
409         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
410                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
411                                        &nmd,
412                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
413         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
414                 ldb_oom(ldb);
415                 return LDB_ERR_OPERATIONS_ERROR;
416         }
417
418         /*
419          * add the autogenerated values
420          */
421         ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
422         if (ret != LDB_SUCCESS) {
423                 ldb_oom(ldb);
424                 return LDB_ERR_OPERATIONS_ERROR;
425         }
426         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
427         if (ret != LDB_SUCCESS) {
428                 ldb_oom(ldb);
429                 return LDB_ERR_OPERATIONS_ERROR;
430         }
431         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
432         if (ret != LDB_SUCCESS) {
433                 ldb_oom(ldb);
434                 return LDB_ERR_OPERATIONS_ERROR;
435         }
436         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
437         if (ret != LDB_SUCCESS) {
438                 ldb_oom(ldb);
439                 return LDB_ERR_OPERATIONS_ERROR;
440         }
441         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
442         if (ret != LDB_SUCCESS) {
443                 ldb_oom(ldb);
444                 return LDB_ERR_OPERATIONS_ERROR;
445         }
446
447         /*
448          * sort the attributes by attid before storing the object
449          */
450         replmd_ldb_message_sort(msg, schema);
451
452         ret = ldb_build_add_req(&down_req, ldb, ac,
453                                 msg,
454                                 req->controls,
455                                 ac, replmd_op_callback,
456                                 req);
457         if (ret != LDB_SUCCESS) {
458                 return ret;
459         }
460
461         /* go on with the call chain */
462         return ldb_next_request(module, down_req);
463 }
464
465
466 /*
467  * update the replPropertyMetaData for one element
468  */
469 static int replmd_update_rpmd_element(struct ldb_context *ldb, 
470                                       struct ldb_message *msg,
471                                       struct ldb_message_element *el,
472                                       struct replPropertyMetaDataBlob *omd,
473                                       struct dsdb_schema *schema,
474                                       uint64_t *seq_num,
475                                       const struct GUID *our_invocation_id,
476                                       NTTIME now)
477 {
478         int i;
479         const struct dsdb_attribute *a;
480         struct replPropertyMetaData1 *md1;
481
482         a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
483         if (a == NULL) {
484                 DEBUG(0,(__location__ ": Unable to find attribute %s in schema\n",
485                          el->name));
486                 return LDB_ERR_OPERATIONS_ERROR;
487         }
488
489         if ((a->systemFlags & 0x00000001) || (a->systemFlags & 0x00000004)) {
490                 /* if the attribute is not replicated (0x00000001)
491                  * or constructed (0x00000004) it has no metadata
492                  */
493                 return LDB_SUCCESS;
494         }
495
496         for (i=0; i<omd->ctr.ctr1.count; i++) {
497                 if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) break;
498         }
499         if (i == omd->ctr.ctr1.count) {
500                 /* we need to add a new one */
501                 omd->ctr.ctr1.array = talloc_realloc(msg, omd->ctr.ctr1.array, 
502                                                      struct replPropertyMetaData1, omd->ctr.ctr1.count+1);
503                 if (omd->ctr.ctr1.array == NULL) {
504                         ldb_oom(ldb);
505                         return LDB_ERR_OPERATIONS_ERROR;
506                 }
507                 omd->ctr.ctr1.count++;
508         }
509
510         /* Get a new sequence number from the backend. We only do this
511          * if we have a change that requires a new
512          * replPropertyMetaData element 
513          */
514         if (*seq_num == 0) {
515                 int ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num);
516                 if (ret != LDB_SUCCESS) {
517                         return LDB_ERR_OPERATIONS_ERROR;
518                 }
519         }
520
521         md1 = &omd->ctr.ctr1.array[i];
522         md1->version                   = 1;
523         md1->attid                     = a->attributeID_id;
524         md1->originating_change_time   = now;
525         md1->originating_invocation_id = *our_invocation_id;
526         md1->originating_usn           = *seq_num;
527         md1->local_usn                 = *seq_num;
528         
529         return LDB_SUCCESS;
530 }
531
532 /*
533  * update the replPropertyMetaData object each time we modify an
534  * object. This is needed for DRS replication, as the merge on the
535  * client is based on this object 
536  */
537 static int replmd_update_rpmd(struct ldb_context *ldb, struct ldb_message *msg,
538                               uint64_t *seq_num)
539 {
540         const struct ldb_val *omd_value;
541         enum ndr_err_code ndr_err;
542         struct replPropertyMetaDataBlob omd;
543         int i;
544         struct dsdb_schema *schema;
545         time_t t = time(NULL);
546         NTTIME now;
547         const struct GUID *our_invocation_id;
548         int ret;
549         const char *attrs[] = { "replPropertyMetaData" , NULL };
550         struct ldb_result *res;
551
552         our_invocation_id = samdb_ntds_invocation_id(ldb);
553         if (!our_invocation_id) {
554                 /* this happens during an initial vampire while
555                    updating the schema */
556                 DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
557                 return LDB_SUCCESS;
558         }
559
560         unix_to_nt_time(&now, t);
561
562         /* search for the existing replPropertyMetaDataBlob */
563         ret = ldb_search(ldb, msg, &res, msg->dn, LDB_SCOPE_BASE, attrs, NULL);
564         if (ret != LDB_SUCCESS || res->count < 1) {
565                 DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
566                          ldb_dn_get_linearized(msg->dn)));
567                 return LDB_ERR_OPERATIONS_ERROR;
568         }
569                 
570
571         omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
572         if (!omd_value) {
573                 DEBUG(0,(__location__ ": Object %s does not have a replPropertyMetaData attribute\n",
574                          ldb_dn_get_linearized(msg->dn)));
575                 return LDB_ERR_OPERATIONS_ERROR;
576         }
577
578         ndr_err = ndr_pull_struct_blob(omd_value, msg,
579                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
580                                        (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
581         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
582                 DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
583                          ldb_dn_get_linearized(msg->dn)));
584                 return LDB_ERR_OPERATIONS_ERROR;
585         }
586
587         if (omd.version != 1) {
588                 DEBUG(0,(__location__ ": bad version %u in replPropertyMetaData for %s\n",
589                          omd.version, ldb_dn_get_linearized(msg->dn)));
590                 return LDB_ERR_OPERATIONS_ERROR;
591         }
592
593         schema = dsdb_get_schema(ldb);
594
595         for (i=0; i<msg->num_elements; i++) {
596                 ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], &omd, schema, seq_num, 
597                                                  our_invocation_id, now);
598                 if (ret != LDB_SUCCESS) {
599                         return ret;
600                 }
601         }
602
603         /*
604          * replmd_update_rpmd_element has done an update if the
605          * seq_num is set
606          */
607         if (*seq_num != 0) {
608                 struct ldb_val *md_value;
609                 struct ldb_message_element *el;
610
611                 md_value = talloc(msg, struct ldb_val);
612                 if (md_value == NULL) {
613                         ldb_oom(ldb);
614                         return LDB_ERR_OPERATIONS_ERROR;
615                 }
616
617                 ndr_err = ndr_push_struct_blob(md_value, msg, 
618                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
619                                                &omd,
620                                                (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
621                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
622                         DEBUG(0,(__location__ ": Failed to marshall replPropertyMetaData for %s\n",
623                                  ldb_dn_get_linearized(msg->dn)));
624                         return LDB_ERR_OPERATIONS_ERROR;
625                 }
626
627                 ret = ldb_msg_add_empty(msg, "replPropertyMetaData", LDB_FLAG_MOD_REPLACE, &el);
628                 if (ret != LDB_SUCCESS) {
629                         DEBUG(0,(__location__ ": Failed to add updated replPropertyMetaData %s\n",
630                                  ldb_dn_get_linearized(msg->dn)));
631                         return ret;
632                 }
633
634                 el->num_values = 1;
635                 el->values = md_value;
636         }
637
638         return LDB_SUCCESS;     
639 }
640
641
642 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
643 {
644         struct ldb_context *ldb;
645         struct replmd_replicated_request *ac;
646         const struct dsdb_schema *schema;
647         struct ldb_request *down_req;
648         struct ldb_message *msg;
649         int ret;
650         time_t t = time(NULL);
651         uint64_t seq_num = 0;
652
653         /* do not manipulate our control entries */
654         if (ldb_dn_is_special(req->op.mod.message->dn)) {
655                 return ldb_next_request(module, req);
656         }
657
658         ldb = ldb_module_get_ctx(module);
659
660         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
661
662         schema = dsdb_get_schema(ldb);
663         if (!schema) {
664                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
665                               "replmd_modify: no dsdb_schema loaded");
666                 return LDB_ERR_CONSTRAINT_VIOLATION;
667         }
668
669         ac = replmd_ctx_init(module, req);
670         if (!ac) {
671                 return LDB_ERR_OPERATIONS_ERROR;
672         }
673
674         ac->schema = schema;
675
676         /* we have to copy the message as the caller might have it as a const */
677         msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
678         if (msg == NULL) {
679                 talloc_free(ac);
680                 return LDB_ERR_OPERATIONS_ERROR;
681         }
682
683         /* TODO:
684          * - get the whole old object
685          * - if the old object doesn't exist report an error
686          * - give an error when a readonly attribute should
687          *   be modified
688          * - merge the changed into the old object
689          *   if the caller set values to the same value
690          *   ignore the attribute, return success when no
691          *   attribute was changed
692          */
693
694         ret = replmd_update_rpmd(ldb, msg, &seq_num);
695         if (ret != LDB_SUCCESS) {
696                 return ret;
697         }
698
699         /* TODO:
700          * - sort the attributes by attid with replmd_ldb_message_sort()
701          * - replace the old object with the newly constructed one
702          */
703
704         ret = ldb_build_mod_req(&down_req, ldb, ac,
705                                 msg,
706                                 req->controls,
707                                 ac, replmd_op_callback,
708                                 req);
709         if (ret != LDB_SUCCESS) {
710                 return ret;
711         }
712         talloc_steal(down_req, msg);
713
714         /* we only change whenChanged and uSNChanged if the seq_num
715            has changed */
716         if (seq_num != 0) {
717                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
718                         talloc_free(ac);
719                         return LDB_ERR_OPERATIONS_ERROR;
720                 }
721
722                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
723                         talloc_free(ac);
724                         return LDB_ERR_OPERATIONS_ERROR;
725                 }
726         }
727
728         /* go on with the call chain */
729         return ldb_next_request(module, down_req);
730 }
731
732 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
733 {
734         return ret;
735 }
736
737 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
738 {
739         int ret = LDB_ERR_OTHER;
740         /* TODO: do some error mapping */
741         return ret;
742 }
743
744 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
745
746 static int replmd_replicated_apply_add_callback(struct ldb_request *req,
747                                                 struct ldb_reply *ares)
748 {
749         struct ldb_context *ldb;
750         struct replmd_replicated_request *ar = talloc_get_type(req->context,
751                                                struct replmd_replicated_request);
752         int ret;
753
754         ldb = ldb_module_get_ctx(ar->module);
755
756         if (!ares) {
757                 return ldb_module_done(ar->req, NULL, NULL,
758                                         LDB_ERR_OPERATIONS_ERROR);
759         }
760         if (ares->error != LDB_SUCCESS) {
761                 return ldb_module_done(ar->req, ares->controls,
762                                         ares->response, ares->error);
763         }
764
765         if (ares->type != LDB_REPLY_DONE) {
766                 ldb_set_errstring(ldb, "Invalid reply type\n!");
767                 return ldb_module_done(ar->req, NULL, NULL,
768                                         LDB_ERR_OPERATIONS_ERROR);
769         }
770
771         talloc_free(ares);
772         ar->index_current++;
773
774         ret = replmd_replicated_apply_next(ar);
775         if (ret != LDB_SUCCESS) {
776                 return ldb_module_done(ar->req, NULL, NULL, ret);
777         }
778
779         return LDB_SUCCESS;
780 }
781
782 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
783 {
784         struct ldb_context *ldb;
785         struct ldb_request *change_req;
786         enum ndr_err_code ndr_err;
787         struct ldb_message *msg;
788         struct replPropertyMetaDataBlob *md;
789         struct ldb_val md_value;
790         uint32_t i;
791         uint64_t seq_num;
792         int ret;
793
794         /*
795          * TODO: check if the parent object exist
796          */
797
798         /*
799          * TODO: handle the conflict case where an object with the
800          *       same name exist
801          */
802
803         ldb = ldb_module_get_ctx(ar->module);
804         msg = ar->objs->objects[ar->index_current].msg;
805         md = ar->objs->objects[ar->index_current].meta_data;
806
807         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
808         if (ret != LDB_SUCCESS) {
809                 return replmd_replicated_request_error(ar, ret);
810         }
811
812         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
813         if (ret != LDB_SUCCESS) {
814                 return replmd_replicated_request_error(ar, ret);
815         }
816
817         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
818         if (ret != LDB_SUCCESS) {
819                 return replmd_replicated_request_error(ar, ret);
820         }
821
822         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
823         if (ret != LDB_SUCCESS) {
824                 return replmd_replicated_request_error(ar, ret);
825         }
826
827         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
828         if (ret != LDB_SUCCESS) {
829                 return replmd_replicated_request_error(ar, ret);
830         }
831
832         /*
833          * the meta data array is already sorted by the caller
834          */
835         for (i=0; i < md->ctr.ctr1.count; i++) {
836                 md->ctr.ctr1.array[i].local_usn = seq_num;
837         }
838         ndr_err = ndr_push_struct_blob(&md_value, msg, 
839                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
840                                        md,
841                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
842         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
843                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
844                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
845         }
846         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
847         if (ret != LDB_SUCCESS) {
848                 return replmd_replicated_request_error(ar, ret);
849         }
850
851         replmd_ldb_message_sort(msg, ar->schema);
852
853         if (DEBUGLVL(4)) {
854                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
855                 DEBUG(4, ("DRS replication add message:\n%s\n", s));
856                 talloc_free(s);
857         }
858
859         ret = ldb_build_add_req(&change_req,
860                                 ldb,
861                                 ar,
862                                 msg,
863                                 ar->controls,
864                                 ar,
865                                 replmd_replicated_apply_add_callback,
866                                 ar->req);
867         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
868
869         return ldb_next_request(ar->module, change_req);
870 }
871
872 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
873                                                          struct replPropertyMetaData1 *m2)
874 {
875         int ret;
876
877         if (m1->version != m2->version) {
878                 return m1->version - m2->version;
879         }
880
881         if (m1->originating_change_time != m2->originating_change_time) {
882                 return m1->originating_change_time - m2->originating_change_time;
883         }
884
885         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
886         if (ret != 0) {
887                 return ret;
888         }
889
890         return m1->originating_usn - m2->originating_usn;
891 }
892
893 static int replmd_replicated_apply_merge_callback(struct ldb_request *req,
894                                                   struct ldb_reply *ares)
895 {
896         struct ldb_context *ldb;
897         struct replmd_replicated_request *ar = talloc_get_type(req->context,
898                                                struct replmd_replicated_request);
899         int ret;
900
901         ldb = ldb_module_get_ctx(ar->module);
902
903         if (!ares) {
904                 return ldb_module_done(ar->req, NULL, NULL,
905                                         LDB_ERR_OPERATIONS_ERROR);
906         }
907         if (ares->error != LDB_SUCCESS) {
908                 return ldb_module_done(ar->req, ares->controls,
909                                         ares->response, ares->error);
910         }
911
912         if (ares->type != LDB_REPLY_DONE) {
913                 ldb_set_errstring(ldb, "Invalid reply type\n!");
914                 return ldb_module_done(ar->req, NULL, NULL,
915                                         LDB_ERR_OPERATIONS_ERROR);
916         }
917
918         talloc_free(ares);
919         ar->index_current++;
920
921         ret = replmd_replicated_apply_next(ar);
922         if (ret != LDB_SUCCESS) {
923                 return ldb_module_done(ar->req, NULL, NULL, ret);
924         }
925
926         return LDB_SUCCESS;
927 }
928
929 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
930 {
931         struct ldb_context *ldb;
932         struct ldb_request *change_req;
933         enum ndr_err_code ndr_err;
934         struct ldb_message *msg;
935         struct replPropertyMetaDataBlob *rmd;
936         struct replPropertyMetaDataBlob omd;
937         const struct ldb_val *omd_value;
938         struct replPropertyMetaDataBlob nmd;
939         struct ldb_val nmd_value;
940         uint32_t i,j,ni=0;
941         uint32_t removed_attrs = 0;
942         uint64_t seq_num;
943         int ret;
944
945         ldb = ldb_module_get_ctx(ar->module);
946         msg = ar->objs->objects[ar->index_current].msg;
947         rmd = ar->objs->objects[ar->index_current].meta_data;
948         ZERO_STRUCT(omd);
949         omd.version = 1;
950
951         /*
952          * TODO: check repl data is correct after a rename
953          */
954         if (ldb_dn_compare(msg->dn, ar->search_msg->dn) != 0) {
955                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_request rename %s => %s\n",
956                           ldb_dn_get_linearized(ar->search_msg->dn),
957                           ldb_dn_get_linearized(msg->dn));
958                 if (ldb_rename(ldb, ar->search_msg->dn, msg->dn) != LDB_SUCCESS) {
959                         ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_replicated_request rename %s => %s failed - %s\n",
960                                   ldb_dn_get_linearized(ar->search_msg->dn),
961                                   ldb_dn_get_linearized(msg->dn),
962                                   ldb_errstring(ldb));
963                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
964                 }
965         }
966
967         /* find existing meta data */
968         omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
969         if (omd_value) {
970                 ndr_err = ndr_pull_struct_blob(omd_value, ar,
971                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
972                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
973                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
974                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
975                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
976                 }
977
978                 if (omd.version != 1) {
979                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
980                 }
981         }
982
983         ZERO_STRUCT(nmd);
984         nmd.version = 1;
985         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
986         nmd.ctr.ctr1.array = talloc_array(ar,
987                                           struct replPropertyMetaData1,
988                                           nmd.ctr.ctr1.count);
989         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
990
991         /* first copy the old meta data */
992         for (i=0; i < omd.ctr.ctr1.count; i++) {
993                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
994                 ni++;
995         }
996
997         /* now merge in the new meta data */
998         for (i=0; i < rmd->ctr.ctr1.count; i++) {
999                 bool found = false;
1000
1001                 for (j=0; j < ni; j++) {
1002                         int cmp;
1003
1004                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
1005                                 continue;
1006                         }
1007
1008                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
1009                                                                             &nmd.ctr.ctr1.array[j]);
1010                         if (cmp > 0) {
1011                                 /* replace the entry */
1012                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
1013                                 found = true;
1014                                 break;
1015                         }
1016
1017                         /* we don't want to apply this change so remove the attribute */
1018                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
1019                         removed_attrs++;
1020
1021                         found = true;
1022                         break;
1023                 }
1024
1025                 if (found) continue;
1026
1027                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
1028                 ni++;
1029         }
1030
1031         /*
1032          * finally correct the size of the meta_data array
1033          */
1034         nmd.ctr.ctr1.count = ni;
1035
1036         /*
1037          * the rdn attribute (the alias for the name attribute),
1038          * 'cn' for most objects is the last entry in the meta data array
1039          * we have stored
1040          *
1041          * sort the new meta data array
1042          */
1043         {
1044                 struct replPropertyMetaData1 *rdn_p;
1045                 uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
1046
1047                 rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
1048                 replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_p->attid);
1049         }
1050
1051         /* create the meta data value */
1052         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
1053                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1054                                        &nmd,
1055                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1056         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1057                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1058                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1059         }
1060
1061         /*
1062          * check if some replicated attributes left, otherwise skip the ldb_modify() call
1063          */
1064         if (msg->num_elements == 0) {
1065                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
1066                           ar->index_current);
1067
1068                 ar->index_current++;
1069                 return replmd_replicated_apply_next(ar);
1070         }
1071
1072         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
1073                   ar->index_current, msg->num_elements);
1074
1075         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
1076         if (ret != LDB_SUCCESS) {
1077                 return replmd_replicated_request_error(ar, ret);
1078         }
1079
1080         for (i=0; i<ni; i++) {
1081                 nmd.ctr.ctr1.array[i].local_usn = seq_num;
1082         }
1083
1084         /*
1085          * when we know that we'll modify the record, add the whenChanged, uSNChanged
1086          * and replPopertyMetaData attributes
1087          */
1088         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1089         if (ret != LDB_SUCCESS) {
1090                 return replmd_replicated_request_error(ar, ret);
1091         }
1092         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1093         if (ret != LDB_SUCCESS) {
1094                 return replmd_replicated_request_error(ar, ret);
1095         }
1096         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
1097         if (ret != LDB_SUCCESS) {
1098                 return replmd_replicated_request_error(ar, ret);
1099         }
1100
1101         replmd_ldb_message_sort(msg, ar->schema);
1102
1103         /* we want to replace the old values */
1104         for (i=0; i < msg->num_elements; i++) {
1105                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
1106         }
1107
1108         if (DEBUGLVL(4)) {
1109                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1110                 DEBUG(4, ("DRS replication modify message:\n%s\n", s));
1111                 talloc_free(s);
1112         }
1113
1114         ret = ldb_build_mod_req(&change_req,
1115                                 ldb,
1116                                 ar,
1117                                 msg,
1118                                 ar->controls,
1119                                 ar,
1120                                 replmd_replicated_apply_merge_callback,
1121                                 ar->req);
1122         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1123
1124         return ldb_next_request(ar->module, change_req);
1125 }
1126
1127 static int replmd_replicated_apply_search_callback(struct ldb_request *req,
1128                                                    struct ldb_reply *ares)
1129 {
1130         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1131                                                struct replmd_replicated_request);
1132         int ret;
1133
1134         if (!ares) {
1135                 return ldb_module_done(ar->req, NULL, NULL,
1136                                         LDB_ERR_OPERATIONS_ERROR);
1137         }
1138         if (ares->error != LDB_SUCCESS &&
1139             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1140                 return ldb_module_done(ar->req, ares->controls,
1141                                         ares->response, ares->error);
1142         }
1143
1144         switch (ares->type) {
1145         case LDB_REPLY_ENTRY:
1146                 ar->search_msg = talloc_steal(ar, ares->message);
1147                 break;
1148
1149         case LDB_REPLY_REFERRAL:
1150                 /* we ignore referrals */
1151                 break;
1152
1153         case LDB_REPLY_DONE:
1154                 if (ar->search_msg != NULL) {
1155                         ret = replmd_replicated_apply_merge(ar);
1156                 } else {
1157                         ret = replmd_replicated_apply_add(ar);
1158                 }
1159                 if (ret != LDB_SUCCESS) {
1160                         return ldb_module_done(ar->req, NULL, NULL, ret);
1161                 }
1162         }
1163
1164         talloc_free(ares);
1165         return LDB_SUCCESS;
1166 }
1167
1168 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
1169
1170 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
1171 {
1172         struct ldb_context *ldb;
1173         int ret;
1174         char *tmp_str;
1175         char *filter;
1176         struct ldb_request *search_req;
1177
1178         if (ar->index_current >= ar->objs->num_objects) {
1179                 /* done with it, go to next stage */
1180                 return replmd_replicated_uptodate_vector(ar);
1181         }
1182
1183         ldb = ldb_module_get_ctx(ar->module);
1184         ar->search_msg = NULL;
1185
1186         tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].guid_value);
1187         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1188
1189         filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
1190         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1191         talloc_free(tmp_str);
1192
1193         ret = ldb_build_search_req(&search_req,
1194                                    ldb,
1195                                    ar,
1196                                    ar->objs->partition_dn,
1197                                    LDB_SCOPE_SUBTREE,
1198                                    filter,
1199                                    NULL,
1200                                    NULL,
1201                                    ar,
1202                                    replmd_replicated_apply_search_callback,
1203                                    ar->req);
1204         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1205
1206         return ldb_next_request(ar->module, search_req);
1207 }
1208
1209 static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
1210                                                       struct ldb_reply *ares)
1211 {
1212         struct ldb_context *ldb;
1213         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1214                                                struct replmd_replicated_request);
1215         ldb = ldb_module_get_ctx(ar->module);
1216
1217         if (!ares) {
1218                 return ldb_module_done(ar->req, NULL, NULL,
1219                                         LDB_ERR_OPERATIONS_ERROR);
1220         }
1221         if (ares->error != LDB_SUCCESS) {
1222                 return ldb_module_done(ar->req, ares->controls,
1223                                         ares->response, ares->error);
1224         }
1225
1226         if (ares->type != LDB_REPLY_DONE) {
1227                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1228                 return ldb_module_done(ar->req, NULL, NULL,
1229                                         LDB_ERR_OPERATIONS_ERROR);
1230         }
1231
1232         talloc_free(ares);
1233
1234         return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
1235 }
1236
1237 static int replmd_drsuapi_DsReplicaCursor2_compare(const struct drsuapi_DsReplicaCursor2 *c1,
1238                                                    const struct drsuapi_DsReplicaCursor2 *c2)
1239 {
1240         return GUID_compare(&c1->source_dsa_invocation_id, &c2->source_dsa_invocation_id);
1241 }
1242
1243 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1244 {
1245         struct ldb_context *ldb;
1246         struct ldb_request *change_req;
1247         enum ndr_err_code ndr_err;
1248         struct ldb_message *msg;
1249         struct replUpToDateVectorBlob ouv;
1250         const struct ldb_val *ouv_value;
1251         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1252         struct replUpToDateVectorBlob nuv;
1253         struct ldb_val nuv_value;
1254         struct ldb_message_element *nuv_el = NULL;
1255         const struct GUID *our_invocation_id;
1256         struct ldb_message_element *orf_el = NULL;
1257         struct repsFromToBlob nrf;
1258         struct ldb_val *nrf_value = NULL;
1259         struct ldb_message_element *nrf_el = NULL;
1260         uint32_t i,j,ni=0;
1261         bool found = false;
1262         time_t t = time(NULL);
1263         NTTIME now;
1264         int ret;
1265
1266         ldb = ldb_module_get_ctx(ar->module);
1267         ruv = ar->objs->uptodateness_vector;
1268         ZERO_STRUCT(ouv);
1269         ouv.version = 2;
1270         ZERO_STRUCT(nuv);
1271         nuv.version = 2;
1272
1273         unix_to_nt_time(&now, t);
1274
1275         /*
1276          * first create the new replUpToDateVector
1277          */
1278         ouv_value = ldb_msg_find_ldb_val(ar->search_msg, "replUpToDateVector");
1279         if (ouv_value) {
1280                 ndr_err = ndr_pull_struct_blob(ouv_value, ar,
1281                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &ouv,
1282                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1283                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1284                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1285                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1286                 }
1287
1288                 if (ouv.version != 2) {
1289                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1290                 }
1291         }
1292
1293         /*
1294          * the new uptodateness vector will at least
1295          * contain 1 entry, one for the source_dsa
1296          *
1297          * plus optional values from our old vector and the one from the source_dsa
1298          */
1299         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
1300         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1301         nuv.ctr.ctr2.cursors = talloc_array(ar,
1302                                             struct drsuapi_DsReplicaCursor2,
1303                                             nuv.ctr.ctr2.count);
1304         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1305
1306         /* first copy the old vector */
1307         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1308                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1309                 ni++;
1310         }
1311
1312         /* get our invocation_id if we have one already attached to the ldb */
1313         our_invocation_id = samdb_ntds_invocation_id(ldb);
1314
1315         /* merge in the source_dsa vector is available */
1316         for (i=0; (ruv && i < ruv->count); i++) {
1317                 found = false;
1318
1319                 if (our_invocation_id &&
1320                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1321                                our_invocation_id)) {
1322                         continue;
1323                 }
1324
1325                 for (j=0; j < ni; j++) {
1326                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1327                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1328                                 continue;
1329                         }
1330
1331                         found = true;
1332
1333                         /*
1334                          * we update only the highest_usn and not the latest_sync_success time,
1335                          * because the last success stands for direct replication
1336                          */
1337                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1338                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1339                         }
1340                         break;                  
1341                 }
1342
1343                 if (found) continue;
1344
1345                 /* if it's not there yet, add it */
1346                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1347                 ni++;
1348         }
1349
1350         /*
1351          * merge in the current highwatermark for the source_dsa
1352          */
1353         found = false;
1354         for (j=0; j < ni; j++) {
1355                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1356                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1357                         continue;
1358                 }
1359
1360                 found = true;
1361
1362                 /*
1363                  * here we update the highest_usn and last_sync_success time
1364                  * because we're directly replicating from the source_dsa
1365                  *
1366                  * and use the tmp_highest_usn because this is what we have just applied
1367                  * to our ldb
1368                  */
1369                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1370                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1371                 break;
1372         }
1373         if (!found) {
1374                 /*
1375                  * here we update the highest_usn and last_sync_success time
1376                  * because we're directly replicating from the source_dsa
1377                  *
1378                  * and use the tmp_highest_usn because this is what we have just applied
1379                  * to our ldb
1380                  */
1381                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1382                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1383                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1384                 ni++;
1385         }
1386
1387         /*
1388          * finally correct the size of the cursors array
1389          */
1390         nuv.ctr.ctr2.count = ni;
1391
1392         /*
1393          * sort the cursors
1394          */
1395         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1396               sizeof(struct drsuapi_DsReplicaCursor2),
1397               (comparison_fn_t)replmd_drsuapi_DsReplicaCursor2_compare);
1398
1399         /*
1400          * create the change ldb_message
1401          */
1402         msg = ldb_msg_new(ar);
1403         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1404         msg->dn = ar->search_msg->dn;
1405
1406         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
1407                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1408                                        &nuv,
1409                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1410         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1411                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1412                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1413         }
1414         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1415         if (ret != LDB_SUCCESS) {
1416                 return replmd_replicated_request_error(ar, ret);
1417         }
1418         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1419
1420         /*
1421          * now create the new repsFrom value from the given repsFromTo1 structure
1422          */
1423         ZERO_STRUCT(nrf);
1424         nrf.version                                     = 1;
1425         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1426         /* and fix some values... */
1427         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1428         nrf.ctr.ctr1.last_success                       = now;
1429         nrf.ctr.ctr1.last_attempt                       = now;
1430         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1431         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1432
1433         /*
1434          * first see if we already have a repsFrom value for the current source dsa
1435          * if so we'll later replace this value
1436          */
1437         orf_el = ldb_msg_find_element(ar->search_msg, "repsFrom");
1438         if (orf_el) {
1439                 for (i=0; i < orf_el->num_values; i++) {
1440                         struct repsFromToBlob *trf;
1441
1442                         trf = talloc(ar, struct repsFromToBlob);
1443                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1444
1445                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), trf,
1446                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1447                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1448                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1449                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1450                         }
1451
1452                         if (trf->version != 1) {
1453                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1454                         }
1455
1456                         /*
1457                          * we compare the source dsa objectGUID not the invocation_id
1458                          * because we want only one repsFrom value per source dsa
1459                          * and when the invocation_id of the source dsa has changed we don't need 
1460                          * the old repsFrom with the old invocation_id
1461                          */
1462                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1463                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1464                                 talloc_free(trf);
1465                                 continue;
1466                         }
1467
1468                         talloc_free(trf);
1469                         nrf_value = &orf_el->values[i];
1470                         break;
1471                 }
1472
1473                 /*
1474                  * copy over all old values to the new ldb_message
1475                  */
1476                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1477                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1478                 *nrf_el = *orf_el;
1479         }
1480
1481         /*
1482          * if we haven't found an old repsFrom value for the current source dsa
1483          * we'll add a new value
1484          */
1485         if (!nrf_value) {
1486                 struct ldb_val zero_value;
1487                 ZERO_STRUCT(zero_value);
1488                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1489                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1490
1491                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1492         }
1493
1494         /* we now fill the value which is already attached to ldb_message */
1495         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
1496                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1497                                        &nrf,
1498                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1499         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1500                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1501                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1502         }
1503
1504         /* 
1505          * the ldb_message_element for the attribute, has all the old values and the new one
1506          * so we'll replace the whole attribute with all values
1507          */
1508         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1509
1510         if (DEBUGLVL(4)) {
1511                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1512                 DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
1513                 talloc_free(s);
1514         }
1515
1516         /* prepare the ldb_modify() request */
1517         ret = ldb_build_mod_req(&change_req,
1518                                 ldb,
1519                                 ar,
1520                                 msg,
1521                                 ar->controls,
1522                                 ar,
1523                                 replmd_replicated_uptodate_modify_callback,
1524                                 ar->req);
1525         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1526
1527         return ldb_next_request(ar->module, change_req);
1528 }
1529
1530 static int replmd_replicated_uptodate_search_callback(struct ldb_request *req,
1531                                                       struct ldb_reply *ares)
1532 {
1533         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1534                                                struct replmd_replicated_request);
1535         int ret;
1536
1537         if (!ares) {
1538                 return ldb_module_done(ar->req, NULL, NULL,
1539                                         LDB_ERR_OPERATIONS_ERROR);
1540         }
1541         if (ares->error != LDB_SUCCESS &&
1542             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1543                 return ldb_module_done(ar->req, ares->controls,
1544                                         ares->response, ares->error);
1545         }
1546
1547         switch (ares->type) {
1548         case LDB_REPLY_ENTRY:
1549                 ar->search_msg = talloc_steal(ar, ares->message);
1550                 break;
1551
1552         case LDB_REPLY_REFERRAL:
1553                 /* we ignore referrals */
1554                 break;
1555
1556         case LDB_REPLY_DONE:
1557                 if (ar->search_msg == NULL) {
1558                         ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1559                 } else {
1560                         ret = replmd_replicated_uptodate_modify(ar);
1561                 }
1562                 if (ret != LDB_SUCCESS) {
1563                         return ldb_module_done(ar->req, NULL, NULL, ret);
1564                 }
1565         }
1566
1567         talloc_free(ares);
1568         return LDB_SUCCESS;
1569 }
1570
1571
1572 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1573 {
1574         struct ldb_context *ldb;
1575         int ret;
1576         static const char *attrs[] = {
1577                 "replUpToDateVector",
1578                 "repsFrom",
1579                 NULL
1580         };
1581         struct ldb_request *search_req;
1582
1583         ldb = ldb_module_get_ctx(ar->module);
1584         ar->search_msg = NULL;
1585
1586         ret = ldb_build_search_req(&search_req,
1587                                    ldb,
1588                                    ar,
1589                                    ar->objs->partition_dn,
1590                                    LDB_SCOPE_BASE,
1591                                    "(objectClass=*)",
1592                                    attrs,
1593                                    NULL,
1594                                    ar,
1595                                    replmd_replicated_uptodate_search_callback,
1596                                    ar->req);
1597         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1598
1599         return ldb_next_request(ar->module, search_req);
1600 }
1601
1602
1603
1604 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1605 {
1606         struct ldb_context *ldb;
1607         struct dsdb_extended_replicated_objects *objs;
1608         struct replmd_replicated_request *ar;
1609         struct ldb_control **ctrls;
1610         int ret, i;
1611         struct dsdb_control_current_partition *partition_ctrl;
1612         struct replmd_private *replmd_private = 
1613                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1614
1615         ldb = ldb_module_get_ctx(module);
1616
1617         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1618
1619         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1620         if (!objs) {
1621                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1622                 return LDB_ERR_PROTOCOL_ERROR;
1623         }
1624
1625         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1626                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1627                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1628                 return LDB_ERR_PROTOCOL_ERROR;
1629         }
1630
1631         ar = replmd_ctx_init(module, req);
1632         if (!ar)
1633                 return LDB_ERR_OPERATIONS_ERROR;
1634
1635         ar->objs = objs;
1636         ar->schema = dsdb_get_schema(ldb);
1637         if (!ar->schema) {
1638                 ldb_debug_set(ldb, LDB_DEBUG_FATAL, "replmd_ctx_init: no loaded schema found\n");
1639                 talloc_free(ar);
1640                 return LDB_ERR_CONSTRAINT_VIOLATION;
1641         }
1642
1643         ctrls = req->controls;
1644
1645         if (req->controls) {
1646                 req->controls = talloc_memdup(ar, req->controls,
1647                                               talloc_get_size(req->controls));
1648                 if (!req->controls) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1649         }
1650
1651         ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, NULL);
1652         if (ret != LDB_SUCCESS) {
1653                 return ret;
1654         }
1655
1656         /*
1657           add the DSDB_CONTROL_CURRENT_PARTITION_OID control. This
1658           tells the partition module which partition this request is
1659           directed at. That is important as the partition roots appear
1660           twice in the directory, once as mount points in the top
1661           level store, and once as the roots of each partition. The
1662           replication code wants to operate on the root of the
1663           partitions, not the top level mount points
1664          */
1665         partition_ctrl = talloc(req, struct dsdb_control_current_partition);
1666         if (partition_ctrl == NULL) {
1667                 if (!partition_ctrl) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1668         }
1669         partition_ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
1670         partition_ctrl->dn = objs->partition_dn;
1671
1672         ret = ldb_request_add_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID, false, partition_ctrl);
1673         if (ret != LDB_SUCCESS) {
1674                 return ret;
1675         }
1676
1677         ar->controls = req->controls;
1678         req->controls = ctrls;
1679
1680         DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
1681
1682         /* save away the linked attributes for the end of the
1683            transaction */
1684         for (i=0; i<ar->objs->linked_attributes_count; i++) {
1685                 struct la_entry *la_entry;
1686
1687                 if (replmd_private == NULL) {
1688                         DEBUG(0,(__location__ ": repl_meta_data not called from within a transaction\n"));
1689                         return LDB_ERR_OPERATIONS_ERROR;
1690                 }
1691
1692                 la_entry = talloc(replmd_private, struct la_entry);
1693                 if (la_entry == NULL) {
1694                         ldb_oom(ldb);
1695                         return LDB_ERR_OPERATIONS_ERROR;
1696                 }
1697                 la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
1698                 if (la_entry->la == NULL) {
1699                         ldb_oom(ldb);
1700                         return LDB_ERR_OPERATIONS_ERROR;
1701                 }
1702                 *la_entry->la = ar->objs->linked_attributes[i];
1703
1704                 /* we need to steal the non-scalars so they stay
1705                    around until the end of the transaction */
1706                 talloc_steal(la_entry->la, la_entry->la->identifier);
1707                 talloc_steal(la_entry->la, la_entry->la->value.blob);
1708
1709                 DLIST_ADD(replmd_private->la_list, la_entry);
1710         }
1711
1712         return replmd_replicated_apply_next(ar);
1713 }
1714
1715 /*
1716   process one linked attribute structure
1717  */
1718 static int replmd_process_linked_attribute(struct ldb_module *module,
1719                                            struct la_entry *la_entry)
1720 {                                          
1721         struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
1722         struct ldb_context *ldb = ldb_module_get_ctx(module);
1723         struct drsuapi_DsReplicaObjectIdentifier3 target;
1724         struct ldb_message *msg;
1725         struct ldb_message_element *ret_el;
1726         TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
1727         enum ndr_err_code ndr_err;
1728         char *target_dn;
1729         struct ldb_request *mod_req;
1730         int ret;
1731         const struct dsdb_attribute *attr;
1732
1733 /*
1734 linked_attributes[0]:                                                     
1735      &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute 
1736         identifier               : *                                      
1737             identifier: struct drsuapi_DsReplicaObjectIdentifier          
1738                 __ndr_size               : 0x0000003a (58)                
1739                 __ndr_size_sid           : 0x00000000 (0)                 
1740                 guid                     : 8e95b6a9-13dd-4158-89db-3220a5be5cc7
1741                 sid                      : S-0-0                               
1742                 __ndr_size_dn            : 0x00000000 (0)                      
1743                 dn                       : ''                                  
1744         attid                    : DRSUAPI_ATTRIBUTE_member (0x1F)             
1745         value: struct drsuapi_DsAttributeValue                                 
1746             __ndr_size               : 0x0000007e (126)                        
1747             blob                     : *                                       
1748                 blob                     : DATA_BLOB length=126                
1749         flags                    : 0x00000001 (1)                              
1750                1: DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE                      
1751         originating_add_time     : Wed Sep  2 22:20:01 2009 EST                
1752         meta_data: struct drsuapi_DsReplicaMetaData                            
1753             version                  : 0x00000015 (21)                         
1754             originating_change_time  : Wed Sep  2 23:39:07 2009 EST            
1755             originating_invocation_id: 794640f3-18cf-40ee-a211-a93992b67a64    
1756             originating_usn          : 0x000000000001e19c (123292)             
1757      &target: struct drsuapi_DsReplicaObjectIdentifier3                        
1758         __ndr_size               : 0x0000007e (126)                            
1759         __ndr_size_sid           : 0x0000001c (28)                             
1760         guid                     : 7639e594-db75-4086-b0d4-67890ae46031        
1761         sid                      : S-1-5-21-2848215498-2472035911-1947525656-19924
1762         __ndr_size_dn            : 0x00000022 (34)                                
1763         dn                       : 'CN=UOne,OU=TestOU,DC=vsofs8,DC=com'           
1764  */
1765         if (DEBUGLVL(4)) {
1766                 NDR_PRINT_DEBUG(drsuapi_DsReplicaLinkedAttribute, la); 
1767         }
1768         
1769         /* decode the target of the link */
1770         ndr_err = ndr_pull_struct_blob(la->value.blob, 
1771                                        tmp_ctx, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1772                                        &target,
1773                                        (ndr_pull_flags_fn_t)ndr_pull_drsuapi_DsReplicaObjectIdentifier3);
1774         if (ndr_err != NDR_ERR_SUCCESS) {
1775                 DEBUG(0,("Unable to decode linked_attribute target\n"));
1776                 dump_data(4, la->value.blob->data, la->value.blob->length);                     
1777                 talloc_free(tmp_ctx);
1778                 return LDB_ERR_OPERATIONS_ERROR;
1779         }
1780         if (DEBUGLVL(4)) {
1781                 NDR_PRINT_DEBUG(drsuapi_DsReplicaObjectIdentifier3, &target);
1782         }
1783
1784         /* construct a modify request for this attribute change */
1785         msg = ldb_msg_new(tmp_ctx);
1786         if (!msg) {
1787                 ldb_oom(ldb);
1788                 talloc_free(tmp_ctx);
1789                 return LDB_ERR_OPERATIONS_ERROR;
1790         }
1791
1792         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, 
1793                                    GUID_string(tmp_ctx, &la->identifier->guid), &msg->dn);
1794         if (ret != LDB_SUCCESS) {
1795                 talloc_free(tmp_ctx);
1796                 return ret;
1797         }
1798
1799         /* find the attribute being modified */
1800         attr = dsdb_attribute_by_attributeID_id(dsdb_get_schema(ldb), la->attid);
1801         if (attr == NULL) {
1802                 DEBUG(0, (__location__ ": Unable to find attributeID 0x%x\n", la->attid));
1803                 talloc_free(tmp_ctx);
1804                 return LDB_ERR_OPERATIONS_ERROR;
1805         }
1806
1807         if (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) {
1808                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
1809                                         LDB_FLAG_MOD_ADD, &ret_el);
1810         } else {
1811                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
1812                                         LDB_FLAG_MOD_DELETE, &ret_el);
1813         }
1814         if (ret != LDB_SUCCESS) {
1815                 talloc_free(tmp_ctx);
1816                 return ret;
1817         }
1818         ret_el->values = talloc_array(msg, struct ldb_val, 1);
1819         if (!ret_el->values) {
1820                 ldb_oom(ldb);
1821                 talloc_free(tmp_ctx);
1822                 return LDB_ERR_OPERATIONS_ERROR;
1823         }
1824         ret_el->num_values = 1;
1825
1826         target_dn = talloc_asprintf(tmp_ctx, "<GUID=%s>;<SID=%s>;%s",
1827                                     GUID_string(tmp_ctx, &target.guid),
1828                                     dom_sid_string(tmp_ctx, &target.sid),
1829                                     target.dn);
1830         if (target_dn == NULL) {
1831                 ldb_oom(ldb);
1832                 talloc_free(tmp_ctx);
1833                 return LDB_ERR_OPERATIONS_ERROR;
1834         }
1835         ret_el->values[0] = data_blob_string_const(target_dn);
1836
1837         ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
1838                                 msg,
1839                                 NULL,
1840                                 NULL, 
1841                                 ldb_op_default_callback,
1842                                 NULL);
1843         if (ret != LDB_SUCCESS) {
1844                 talloc_free(tmp_ctx);
1845                 return ret;
1846         }
1847         talloc_steal(mod_req, msg);
1848
1849         if (DEBUGLVL(4)) {
1850                 DEBUG(4,("Applying DRS linked attribute change:\n%s\n",
1851                          ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg)));
1852         }
1853
1854         /* Run the new request */
1855         ret = ldb_next_request(module, mod_req);
1856
1857         /* we need to wait for this to finish, as we are being called
1858            from the synchronous end_transaction hook of this module */
1859         if (ret == LDB_SUCCESS) {
1860                 ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
1861         }
1862
1863         if (ret != LDB_SUCCESS) {
1864                 ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s' %s\n",
1865                           ldb_errstring(ldb),
1866                           ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
1867         }
1868         
1869         talloc_free(tmp_ctx);
1870
1871         return ret;     
1872 }
1873
1874 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
1875 {
1876         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
1877                 return replmd_extended_replicated_objects(module, req);
1878         }
1879
1880         return ldb_next_request(module, req);
1881 }
1882
1883
1884 /*
1885   we hook into the transaction operations to allow us to 
1886   perform the linked attribute updates at the end of the whole
1887   transaction. This allows a forward linked attribute to be created
1888   before the object is created. During a vampire, w2k8 sends us linked
1889   attributes before the objects they are part of.
1890  */
1891 static int replmd_start_transaction(struct ldb_module *module)
1892 {
1893         /* create our private structure for this transaction */
1894         struct replmd_private *replmd_private = talloc_get_type(ldb_module_get_private(module),
1895                                                                 struct replmd_private);
1896         talloc_free(replmd_private);
1897         replmd_private = talloc(module, struct replmd_private);
1898         if (replmd_private == NULL) {
1899                 return LDB_ERR_OPERATIONS_ERROR;
1900         }
1901         replmd_private->la_list = NULL;
1902         ldb_module_set_private(module, replmd_private);
1903         return ldb_next_start_trans(module);
1904 }
1905
1906 /*
1907   on prepare commit we loop over our queued la_context structures and
1908   apply each of them  
1909  */
1910 static int replmd_prepare_commit(struct ldb_module *module)
1911 {
1912         struct replmd_private *replmd_private = 
1913                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1914         struct la_entry *la;
1915
1916         /* walk the list backwards, to do the first entry first, as we
1917          * added the entries with DLIST_ADD() which puts them at the
1918          * start of the list */
1919         for (la = replmd_private->la_list; la && la->next; la=la->next) ;
1920
1921         for (; la; la=la->prev) {
1922                 int ret;
1923                 ret = replmd_process_linked_attribute(module, la);
1924                 if (ret != LDB_SUCCESS) {
1925                         return ret;
1926                 }
1927         }
1928
1929         talloc_free(replmd_private);
1930         ldb_module_set_private(module, NULL);
1931         
1932         return ldb_next_prepare_commit(module);
1933 }
1934
1935 static int replmd_del_transaction(struct ldb_module *module)
1936 {
1937         struct replmd_private *replmd_private = 
1938                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1939         talloc_free(replmd_private);
1940         ldb_module_set_private(module, NULL);
1941         return ldb_next_del_trans(module);
1942 }
1943
1944
1945 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
1946         .name          = "repl_meta_data",
1947         .add           = replmd_add,
1948         .modify        = replmd_modify,
1949         .extended      = replmd_extended,
1950         .start_transaction = replmd_start_transaction,
1951         .prepare_commit    = replmd_prepare_commit,
1952         .del_transaction   = replmd_del_transaction,
1953 };