repl_meta_data: Fix include path when building with standalone ldb.
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12    
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb
29  *
30  *  Component: ldb repl_meta_data module
31  *
32  *  Description: - add a unique objectGUID onto every new record,
33  *               - handle whenCreated, whenChanged timestamps
34  *               - handle uSNCreated, uSNChanged numbers
35  *               - handle replPropertyMetaData attribute
36  *
37  *  Author: Simo Sorce
38  *  Author: Stefan Metzmacher
39  */
40
41 #include "includes.h"
42 #include "ldb_module.h"
43 #include "dsdb/samdb/samdb.h"
44 #include "dsdb/common/proto.h"
45 #include "../libds/common/flags.h"
46 #include "librpc/gen_ndr/ndr_misc.h"
47 #include "librpc/gen_ndr/ndr_drsuapi.h"
48 #include "librpc/gen_ndr/ndr_drsblobs.h"
49 #include "param/param.h"
50 #include "libcli/security/dom_sid.h"
51 #include "lib/util/dlinklist.h"
52
53 struct replmd_private {
54         struct la_entry *la_list;
55 };
56
57 struct la_entry {
58         struct la_entry *next, *prev;
59         struct drsuapi_DsReplicaLinkedAttribute *la;
60 };
61
62 struct replmd_replicated_request {
63         struct ldb_module *module;
64         struct ldb_request *req;
65
66         const struct dsdb_schema *schema;
67
68         struct dsdb_extended_replicated_objects *objs;
69
70         /* the controls we pass down */
71         struct ldb_control **controls;
72
73         uint32_t index_current;
74
75         struct ldb_message *search_msg;
76 };
77
78 /*
79   created a replmd_replicated_request context
80  */
81 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
82                                                          struct ldb_request *req)
83 {
84         struct ldb_context *ldb;
85         struct replmd_replicated_request *ac;
86
87         ldb = ldb_module_get_ctx(module);
88
89         ac = talloc_zero(req, struct replmd_replicated_request);
90         if (ac == NULL) {
91                 ldb_oom(ldb);
92                 return NULL;
93         }
94
95         ac->module = module;
96         ac->req = req;
97         return ac;
98 }
99
100 /*
101   add a time element to a record
102 */
103 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
104 {
105         struct ldb_message_element *el;
106         char *s;
107
108         if (ldb_msg_find_element(msg, attr) != NULL) {
109                 return LDB_SUCCESS;
110         }
111
112         s = ldb_timestring(msg, t);
113         if (s == NULL) {
114                 return LDB_ERR_OPERATIONS_ERROR;
115         }
116
117         if (ldb_msg_add_string(msg, attr, s) != LDB_SUCCESS) {
118                 return LDB_ERR_OPERATIONS_ERROR;
119         }
120
121         el = ldb_msg_find_element(msg, attr);
122         /* always set as replace. This works because on add ops, the flag
123            is ignored */
124         el->flags = LDB_FLAG_MOD_REPLACE;
125
126         return LDB_SUCCESS;
127 }
128
129 /*
130   add a uint64_t element to a record
131 */
132 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
133 {
134         struct ldb_message_element *el;
135
136         if (ldb_msg_find_element(msg, attr) != NULL) {
137                 return LDB_SUCCESS;
138         }
139
140         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != LDB_SUCCESS) {
141                 return LDB_ERR_OPERATIONS_ERROR;
142         }
143
144         el = ldb_msg_find_element(msg, attr);
145         /* always set as replace. This works because on add ops, the flag
146            is ignored */
147         el->flags = LDB_FLAG_MOD_REPLACE;
148
149         return LDB_SUCCESS;
150 }
151
152 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
153                                                    const struct replPropertyMetaData1 *m2,
154                                                    const uint32_t *rdn_attid)
155 {
156         if (m1->attid == m2->attid) {
157                 return 0;
158         }
159
160         /*
161          * the rdn attribute should be at the end!
162          * so we need to return a value greater than zero
163          * which means m1 is greater than m2
164          */
165         if (m1->attid == *rdn_attid) {
166                 return 1;
167         }
168
169         /*
170          * the rdn attribute should be at the end!
171          * so we need to return a value less than zero
172          * which means m2 is greater than m1
173          */
174         if (m2->attid == *rdn_attid) {
175                 return -1;
176         }
177
178         return m1->attid - m2->attid;
179 }
180
181 static void replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
182                                                  const uint32_t *rdn_attid)
183 {
184         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
185                   discard_const_p(void, rdn_attid), (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
186 }
187
188 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
189                                                  const struct ldb_message_element *e2,
190                                                  const struct dsdb_schema *schema)
191 {
192         const struct dsdb_attribute *a1;
193         const struct dsdb_attribute *a2;
194
195         /* 
196          * TODO: make this faster by caching the dsdb_attribute pointer
197          *       on the ldb_messag_element
198          */
199
200         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
201         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
202
203         /*
204          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
205          *       in the schema
206          */
207         if (!a1 || !a2) {
208                 return strcasecmp(e1->name, e2->name);
209         }
210
211         return a1->attributeID_id - a2->attributeID_id;
212 }
213
214 static void replmd_ldb_message_sort(struct ldb_message *msg,
215                                     const struct dsdb_schema *schema)
216 {
217         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
218                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
219 }
220
221 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
222 {
223         struct ldb_context *ldb;
224         struct replmd_replicated_request *ac;
225
226         ac = talloc_get_type(req->context, struct replmd_replicated_request);
227         ldb = ldb_module_get_ctx(ac->module);
228
229         if (!ares) {
230                 return ldb_module_done(ac->req, NULL, NULL,
231                                         LDB_ERR_OPERATIONS_ERROR);
232         }
233         if (ares->error != LDB_SUCCESS) {
234                 return ldb_module_done(ac->req, ares->controls,
235                                         ares->response, ares->error);
236         }
237
238         if (ares->type != LDB_REPLY_DONE) {
239                 ldb_set_errstring(ldb,
240                                   "invalid ldb_reply_type in callback");
241                 talloc_free(ares);
242                 return ldb_module_done(ac->req, NULL, NULL,
243                                         LDB_ERR_OPERATIONS_ERROR);
244         }
245
246         return ldb_module_done(ac->req, ares->controls,
247                                 ares->response, LDB_SUCCESS);
248 }
249
250 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
251 {
252         struct ldb_context *ldb;
253         struct replmd_replicated_request *ac;
254         const struct dsdb_schema *schema;
255         enum ndr_err_code ndr_err;
256         struct ldb_request *down_req;
257         struct ldb_message *msg;
258         const struct dsdb_attribute *rdn_attr = NULL;
259         struct GUID guid;
260         struct ldb_val guid_value;
261         struct replPropertyMetaDataBlob nmd;
262         struct ldb_val nmd_value;
263         uint64_t seq_num;
264         const struct GUID *our_invocation_id;
265         time_t t = time(NULL);
266         NTTIME now;
267         char *time_str;
268         int ret;
269         uint32_t i, ni=0;
270
271         /* do not manipulate our control entries */
272         if (ldb_dn_is_special(req->op.add.message->dn)) {
273                 return ldb_next_request(module, req);
274         }
275
276         ldb = ldb_module_get_ctx(module);
277
278         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
279
280         schema = dsdb_get_schema(ldb);
281         if (!schema) {
282                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
283                               "replmd_modify: no dsdb_schema loaded");
284                 return LDB_ERR_CONSTRAINT_VIOLATION;
285         }
286
287         ac = replmd_ctx_init(module, req);
288         if (!ac) {
289                 return LDB_ERR_OPERATIONS_ERROR;
290         }
291
292         ac->schema = schema;
293
294         if (ldb_msg_find_element(req->op.add.message, "objectGUID") != NULL) {
295                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
296                               "replmd_add: it's not allowed to add an object with objectGUID\n");
297                 return LDB_ERR_UNWILLING_TO_PERFORM;
298         }
299
300         /* Get a sequence number from the backend */
301         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
302         if (ret != LDB_SUCCESS) {
303                 return ret;
304         }
305
306         /* a new GUID */
307         guid = GUID_random();
308
309         /* get our invocationId */
310         our_invocation_id = samdb_ntds_invocation_id(ldb);
311         if (!our_invocation_id) {
312                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
313                               "replmd_add: unable to find invocationId\n");
314                 return LDB_ERR_OPERATIONS_ERROR;
315         }
316
317         /* we have to copy the message as the caller might have it as a const */
318         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
319         if (msg == NULL) {
320                 ldb_oom(ldb);
321                 return LDB_ERR_OPERATIONS_ERROR;
322         }
323
324         /* generated times */
325         unix_to_nt_time(&now, t);
326         time_str = ldb_timestring(msg, t);
327         if (!time_str) {
328                 return LDB_ERR_OPERATIONS_ERROR;
329         }
330
331         /* 
332          * remove autogenerated attributes
333          */
334         ldb_msg_remove_attr(msg, "whenCreated");
335         ldb_msg_remove_attr(msg, "whenChanged");
336         ldb_msg_remove_attr(msg, "uSNCreated");
337         ldb_msg_remove_attr(msg, "uSNChanged");
338         ldb_msg_remove_attr(msg, "replPropertyMetaData");
339
340         /*
341          * readd replicated attributes
342          */
343         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
344         if (ret != LDB_SUCCESS) {
345                 ldb_oom(ldb);
346                 return LDB_ERR_OPERATIONS_ERROR;
347         }
348
349         /* build the replication meta_data */
350         ZERO_STRUCT(nmd);
351         nmd.version             = 1;
352         nmd.ctr.ctr1.count      = msg->num_elements;
353         nmd.ctr.ctr1.array      = talloc_array(msg,
354                                                struct replPropertyMetaData1,
355                                                nmd.ctr.ctr1.count);
356         if (!nmd.ctr.ctr1.array) {
357                 ldb_oom(ldb);
358                 return LDB_ERR_OPERATIONS_ERROR;
359         }
360
361         for (i=0; i < msg->num_elements; i++) {
362                 struct ldb_message_element *e = &msg->elements[i];
363                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
364                 const struct dsdb_attribute *sa;
365
366                 if (e->name[0] == '@') continue;
367
368                 sa = dsdb_attribute_by_lDAPDisplayName(schema, e->name);
369                 if (!sa) {
370                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
371                                       "replmd_add: attribute '%s' not defined in schema\n",
372                                       e->name);
373                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
374                 }
375
376                 if ((sa->systemFlags & 0x00000001) || (sa->systemFlags & 0x00000004)) {
377                         /* if the attribute is not replicated (0x00000001)
378                          * or constructed (0x00000004) it has no metadata
379                          */
380                         continue;
381                 }
382
383                 m->attid                        = sa->attributeID_id;
384                 m->version                      = 1;
385                 m->originating_change_time      = now;
386                 m->originating_invocation_id    = *our_invocation_id;
387                 m->originating_usn              = seq_num;
388                 m->local_usn                    = seq_num;
389                 ni++;
390
391                 if (ldb_attr_cmp(e->name, ldb_dn_get_rdn_name(msg->dn))) {
392                         rdn_attr = sa;
393                 }
394         }
395
396         /* fix meta data count */
397         nmd.ctr.ctr1.count = ni;
398
399         /*
400          * sort meta data array, and move the rdn attribute entry to the end
401          */
402         replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_attr->attributeID_id);
403
404         /* generated NDR encoded values */
405         ndr_err = ndr_push_struct_blob(&guid_value, msg, 
406                                        NULL,
407                                        &guid,
408                                        (ndr_push_flags_fn_t)ndr_push_GUID);
409         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
410                 ldb_oom(ldb);
411                 return LDB_ERR_OPERATIONS_ERROR;
412         }
413         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
414                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
415                                        &nmd,
416                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
417         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
418                 ldb_oom(ldb);
419                 return LDB_ERR_OPERATIONS_ERROR;
420         }
421
422         /*
423          * add the autogenerated values
424          */
425         ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
426         if (ret != LDB_SUCCESS) {
427                 ldb_oom(ldb);
428                 return LDB_ERR_OPERATIONS_ERROR;
429         }
430         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
431         if (ret != LDB_SUCCESS) {
432                 ldb_oom(ldb);
433                 return LDB_ERR_OPERATIONS_ERROR;
434         }
435         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
436         if (ret != LDB_SUCCESS) {
437                 ldb_oom(ldb);
438                 return LDB_ERR_OPERATIONS_ERROR;
439         }
440         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
441         if (ret != LDB_SUCCESS) {
442                 ldb_oom(ldb);
443                 return LDB_ERR_OPERATIONS_ERROR;
444         }
445         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
446         if (ret != LDB_SUCCESS) {
447                 ldb_oom(ldb);
448                 return LDB_ERR_OPERATIONS_ERROR;
449         }
450
451         /*
452          * sort the attributes by attid before storing the object
453          */
454         replmd_ldb_message_sort(msg, schema);
455
456         ret = ldb_build_add_req(&down_req, ldb, ac,
457                                 msg,
458                                 req->controls,
459                                 ac, replmd_op_callback,
460                                 req);
461         if (ret != LDB_SUCCESS) {
462                 return ret;
463         }
464
465         /* go on with the call chain */
466         return ldb_next_request(module, down_req);
467 }
468
469
470 /*
471  * update the replPropertyMetaData for one element
472  */
473 static int replmd_update_rpmd_element(struct ldb_context *ldb, 
474                                       struct ldb_message *msg,
475                                       struct ldb_message_element *el,
476                                       struct replPropertyMetaDataBlob *omd,
477                                       struct dsdb_schema *schema,
478                                       uint64_t seq_num,
479                                       const struct GUID *our_invocation_id,
480                                       NTTIME now,
481                                       bool *modified)
482 {
483         int i;
484         const struct dsdb_attribute *a;
485         struct replPropertyMetaData1 *md1;
486
487         a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
488         if (a == NULL) {
489                 DEBUG(0,(__location__ ": Unable to find attribute %s in schema\n",
490                          el->name));
491                 return LDB_ERR_OPERATIONS_ERROR;
492         }
493
494         if ((a->systemFlags & 0x00000001) || (a->systemFlags & 0x00000004)) {
495                 /* if the attribute is not replicated (0x00000001)
496                  * or constructed (0x00000004) it has no metadata
497                  */
498                 return LDB_SUCCESS;
499         }
500
501         for (i=0; i<omd->ctr.ctr1.count; i++) {
502                 if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) break;
503         }
504         if (i == omd->ctr.ctr1.count) {
505                 /* we need to add a new one */
506                 omd->ctr.ctr1.array = talloc_realloc(msg, omd->ctr.ctr1.array, 
507                                                      struct replPropertyMetaData1, omd->ctr.ctr1.count+1);
508                 if (omd->ctr.ctr1.array == NULL) {
509                         ldb_oom(ldb);
510                         return LDB_ERR_OPERATIONS_ERROR;
511                 }
512                 omd->ctr.ctr1.count++;
513         }
514
515         md1 = &omd->ctr.ctr1.array[i];
516         md1->version                   = 1;
517         md1->attid                     = a->attributeID_id;
518         md1->originating_change_time   = now;
519         md1->originating_invocation_id = *our_invocation_id;
520         md1->originating_usn           = seq_num;
521         md1->local_usn                 = seq_num;
522         
523         *modified = true;
524         return LDB_SUCCESS;
525 }
526
527 /*
528  * update the replPropertyMetaData object each time we modify an
529  * object. This is needed for DRS replication, as the merge on the
530  * client is based on this object 
531  */
532 static int replmd_update_rpmd(struct ldb_context *ldb, struct ldb_message *msg,
533                               uint64_t seq_num)
534 {
535         const struct ldb_val *omd_value;
536         enum ndr_err_code ndr_err;
537         struct replPropertyMetaDataBlob omd;
538         int i;
539         bool modified = false;
540         struct dsdb_schema *schema;
541         time_t t = time(NULL);
542         NTTIME now;
543         const struct GUID *our_invocation_id;
544         int ret;
545         const char *attrs[] = { "replPropertyMetaData" , NULL };
546         struct ldb_result *res;
547
548         our_invocation_id = samdb_ntds_invocation_id(ldb);
549         if (!our_invocation_id) {
550                 /* this happens during an initial vampire while
551                    updating the schema */
552                 DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
553                 return LDB_SUCCESS;
554         }
555
556         unix_to_nt_time(&now, t);
557
558         /* search for the existing replPropertyMetaDataBlob */
559         ret = ldb_search(ldb, msg, &res, msg->dn, LDB_SCOPE_BASE, attrs, NULL);
560         if (ret != LDB_SUCCESS || res->count < 1) {
561                 DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
562                          ldb_dn_get_linearized(msg->dn)));
563                 return LDB_ERR_OPERATIONS_ERROR;
564         }
565                 
566
567         omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
568         if (!omd_value) {
569                 DEBUG(0,(__location__ ": Object %s does not have a replPropertyMetaData attribute\n",
570                          ldb_dn_get_linearized(msg->dn)));
571                 return LDB_ERR_OPERATIONS_ERROR;
572         }
573
574         ndr_err = ndr_pull_struct_blob(omd_value, msg,
575                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
576                                        (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
577         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
578                 DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
579                          ldb_dn_get_linearized(msg->dn)));
580                 return LDB_ERR_OPERATIONS_ERROR;
581         }
582
583         if (omd.version != 1) {
584                 DEBUG(0,(__location__ ": bad version %u in replPropertyMetaData for %s\n",
585                          omd.version, ldb_dn_get_linearized(msg->dn)));
586                 return LDB_ERR_OPERATIONS_ERROR;
587         }
588
589         schema = dsdb_get_schema(ldb);
590
591         for (i=0; i<msg->num_elements; i++) {
592                 ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], &omd, schema, seq_num, 
593                                                  our_invocation_id, now, &modified);
594                 if (ret != LDB_SUCCESS) {
595                         return ret;
596                 }
597         }
598
599         if (modified) {
600                 struct ldb_val *md_value;
601                 struct ldb_message_element *el;
602
603                 md_value = talloc(msg, struct ldb_val);
604                 if (md_value == NULL) {
605                         ldb_oom(ldb);
606                         return LDB_ERR_OPERATIONS_ERROR;
607                 }
608
609                 ndr_err = ndr_push_struct_blob(md_value, msg, 
610                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
611                                                &omd,
612                                                (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
613                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
614                         DEBUG(0,(__location__ ": Failed to marshall replPropertyMetaData for %s\n",
615                                  ldb_dn_get_linearized(msg->dn)));
616                         return LDB_ERR_OPERATIONS_ERROR;
617                 }
618
619                 ret = ldb_msg_add_empty(msg, "replPropertyMetaData", LDB_FLAG_MOD_REPLACE, &el);
620                 if (ret != LDB_SUCCESS) {
621                         DEBUG(0,(__location__ ": Failed to add updated replPropertyMetaData %s\n",
622                                  ldb_dn_get_linearized(msg->dn)));
623                         return ret;
624                 }
625
626                 el->num_values = 1;
627                 el->values = md_value;
628         }
629
630         return LDB_SUCCESS;     
631 }
632
633
634 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
635 {
636         struct ldb_context *ldb;
637         struct replmd_replicated_request *ac;
638         const struct dsdb_schema *schema;
639         struct ldb_request *down_req;
640         struct ldb_message *msg;
641         int ret;
642         time_t t = time(NULL);
643         uint64_t seq_num;
644
645         /* do not manipulate our control entries */
646         if (ldb_dn_is_special(req->op.mod.message->dn)) {
647                 return ldb_next_request(module, req);
648         }
649
650         ldb = ldb_module_get_ctx(module);
651
652         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
653
654         schema = dsdb_get_schema(ldb);
655         if (!schema) {
656                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
657                               "replmd_modify: no dsdb_schema loaded");
658                 return LDB_ERR_CONSTRAINT_VIOLATION;
659         }
660
661         ac = replmd_ctx_init(module, req);
662         if (!ac) {
663                 return LDB_ERR_OPERATIONS_ERROR;
664         }
665
666         ac->schema = schema;
667
668         /* we have to copy the message as the caller might have it as a const */
669         msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
670         if (msg == NULL) {
671                 talloc_free(ac);
672                 return LDB_ERR_OPERATIONS_ERROR;
673         }
674
675         /* TODO:
676          * - get the whole old object
677          * - if the old object doesn't exist report an error
678          * - give an error when a readonly attribute should
679          *   be modified
680          * - merge the changed into the old object
681          *   if the caller set values to the same value
682          *   ignore the attribute, return success when no
683          *   attribute was changed
684          */
685
686         /* Get a sequence number from the backend */
687         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
688         if (ret != LDB_SUCCESS) {
689                 return LDB_ERR_OPERATIONS_ERROR;
690         }
691
692         ret = replmd_update_rpmd(ldb, msg, seq_num);
693         if (ret != LDB_SUCCESS) {
694                 return ret;
695         }
696
697         if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
698                 talloc_free(ac);
699                 return LDB_ERR_OPERATIONS_ERROR;
700         }
701
702         if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
703                 talloc_free(ac);
704                 return LDB_ERR_OPERATIONS_ERROR;
705         }
706
707         /* TODO:
708          * - sort the attributes by attid with replmd_ldb_message_sort()
709          * - replace the old object with the newly constructed one
710          */
711
712         ret = ldb_build_mod_req(&down_req, ldb, ac,
713                                 msg,
714                                 req->controls,
715                                 ac, replmd_op_callback,
716                                 req);
717         if (ret != LDB_SUCCESS) {
718                 return ret;
719         }
720         talloc_steal(down_req, msg);
721
722         /* go on with the call chain */
723         return ldb_next_request(module, down_req);
724 }
725
726 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
727 {
728         return ret;
729 }
730
731 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
732 {
733         int ret = LDB_ERR_OTHER;
734         /* TODO: do some error mapping */
735         return ret;
736 }
737
738 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
739
740 static int replmd_replicated_apply_add_callback(struct ldb_request *req,
741                                                 struct ldb_reply *ares)
742 {
743         struct ldb_context *ldb;
744         struct replmd_replicated_request *ar = talloc_get_type(req->context,
745                                                struct replmd_replicated_request);
746         int ret;
747
748         ldb = ldb_module_get_ctx(ar->module);
749
750         if (!ares) {
751                 return ldb_module_done(ar->req, NULL, NULL,
752                                         LDB_ERR_OPERATIONS_ERROR);
753         }
754         if (ares->error != LDB_SUCCESS) {
755                 return ldb_module_done(ar->req, ares->controls,
756                                         ares->response, ares->error);
757         }
758
759         if (ares->type != LDB_REPLY_DONE) {
760                 ldb_set_errstring(ldb, "Invalid reply type\n!");
761                 return ldb_module_done(ar->req, NULL, NULL,
762                                         LDB_ERR_OPERATIONS_ERROR);
763         }
764
765         talloc_free(ares);
766         ar->index_current++;
767
768         ret = replmd_replicated_apply_next(ar);
769         if (ret != LDB_SUCCESS) {
770                 return ldb_module_done(ar->req, NULL, NULL, ret);
771         }
772
773         return LDB_SUCCESS;
774 }
775
776 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
777 {
778         struct ldb_context *ldb;
779         struct ldb_request *change_req;
780         enum ndr_err_code ndr_err;
781         struct ldb_message *msg;
782         struct replPropertyMetaDataBlob *md;
783         struct ldb_val md_value;
784         uint32_t i;
785         uint64_t seq_num;
786         int ret;
787
788         /*
789          * TODO: check if the parent object exist
790          */
791
792         /*
793          * TODO: handle the conflict case where an object with the
794          *       same name exist
795          */
796
797         ldb = ldb_module_get_ctx(ar->module);
798         msg = ar->objs->objects[ar->index_current].msg;
799         md = ar->objs->objects[ar->index_current].meta_data;
800
801         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
802         if (ret != LDB_SUCCESS) {
803                 return replmd_replicated_request_error(ar, ret);
804         }
805
806         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
807         if (ret != LDB_SUCCESS) {
808                 return replmd_replicated_request_error(ar, ret);
809         }
810
811         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
812         if (ret != LDB_SUCCESS) {
813                 return replmd_replicated_request_error(ar, ret);
814         }
815
816         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
817         if (ret != LDB_SUCCESS) {
818                 return replmd_replicated_request_error(ar, ret);
819         }
820
821         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
822         if (ret != LDB_SUCCESS) {
823                 return replmd_replicated_request_error(ar, ret);
824         }
825
826         /*
827          * the meta data array is already sorted by the caller
828          */
829         for (i=0; i < md->ctr.ctr1.count; i++) {
830                 md->ctr.ctr1.array[i].local_usn = seq_num;
831         }
832         ndr_err = ndr_push_struct_blob(&md_value, msg, 
833                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
834                                        md,
835                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
836         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
837                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
838                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
839         }
840         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
841         if (ret != LDB_SUCCESS) {
842                 return replmd_replicated_request_error(ar, ret);
843         }
844
845         replmd_ldb_message_sort(msg, ar->schema);
846
847         if (DEBUGLVL(4)) {
848                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
849                 DEBUG(4, ("DRS replication add message:\n%s\n", s));
850                 talloc_free(s);
851         }
852
853         ret = ldb_build_add_req(&change_req,
854                                 ldb,
855                                 ar,
856                                 msg,
857                                 ar->controls,
858                                 ar,
859                                 replmd_replicated_apply_add_callback,
860                                 ar->req);
861         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
862
863         return ldb_next_request(ar->module, change_req);
864 }
865
866 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
867                                                          struct replPropertyMetaData1 *m2)
868 {
869         int ret;
870
871         if (m1->version != m2->version) {
872                 return m1->version - m2->version;
873         }
874
875         if (m1->originating_change_time != m2->originating_change_time) {
876                 return m1->originating_change_time - m2->originating_change_time;
877         }
878
879         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
880         if (ret != 0) {
881                 return ret;
882         }
883
884         return m1->originating_usn - m2->originating_usn;
885 }
886
887 static int replmd_replicated_apply_merge_callback(struct ldb_request *req,
888                                                   struct ldb_reply *ares)
889 {
890         struct ldb_context *ldb;
891         struct replmd_replicated_request *ar = talloc_get_type(req->context,
892                                                struct replmd_replicated_request);
893         int ret;
894
895         ldb = ldb_module_get_ctx(ar->module);
896
897         if (!ares) {
898                 return ldb_module_done(ar->req, NULL, NULL,
899                                         LDB_ERR_OPERATIONS_ERROR);
900         }
901         if (ares->error != LDB_SUCCESS) {
902                 return ldb_module_done(ar->req, ares->controls,
903                                         ares->response, ares->error);
904         }
905
906         if (ares->type != LDB_REPLY_DONE) {
907                 ldb_set_errstring(ldb, "Invalid reply type\n!");
908                 return ldb_module_done(ar->req, NULL, NULL,
909                                         LDB_ERR_OPERATIONS_ERROR);
910         }
911
912         talloc_free(ares);
913         ar->index_current++;
914
915         ret = replmd_replicated_apply_next(ar);
916         if (ret != LDB_SUCCESS) {
917                 return ldb_module_done(ar->req, NULL, NULL, ret);
918         }
919
920         return LDB_SUCCESS;
921 }
922
923 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
924 {
925         struct ldb_context *ldb;
926         struct ldb_request *change_req;
927         enum ndr_err_code ndr_err;
928         struct ldb_message *msg;
929         struct replPropertyMetaDataBlob *rmd;
930         struct replPropertyMetaDataBlob omd;
931         const struct ldb_val *omd_value;
932         struct replPropertyMetaDataBlob nmd;
933         struct ldb_val nmd_value;
934         uint32_t i,j,ni=0;
935         uint32_t removed_attrs = 0;
936         uint64_t seq_num;
937         int ret;
938
939         ldb = ldb_module_get_ctx(ar->module);
940         msg = ar->objs->objects[ar->index_current].msg;
941         rmd = ar->objs->objects[ar->index_current].meta_data;
942         ZERO_STRUCT(omd);
943         omd.version = 1;
944
945         /*
946          * TODO: check repl data is correct after a rename
947          */
948         if (ldb_dn_compare(msg->dn, ar->search_msg->dn) != 0) {
949                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_request rename %s => %s\n",
950                           ldb_dn_get_linearized(ar->search_msg->dn),
951                           ldb_dn_get_linearized(msg->dn));
952                 if (ldb_rename(ldb, ar->search_msg->dn, msg->dn) != LDB_SUCCESS) {
953                         ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_replicated_request rename %s => %s failed - %s\n",
954                                   ldb_dn_get_linearized(ar->search_msg->dn),
955                                   ldb_dn_get_linearized(msg->dn),
956                                   ldb_errstring(ldb));
957                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
958                 }
959         }
960
961         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
962         if (ret != LDB_SUCCESS) {
963                 return replmd_replicated_request_error(ar, ret);
964         }
965
966         /* find existing meta data */
967         omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
968         if (omd_value) {
969                 ndr_err = ndr_pull_struct_blob(omd_value, ar,
970                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
971                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
972                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
973                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
974                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
975                 }
976
977                 if (omd.version != 1) {
978                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
979                 }
980         }
981
982         ZERO_STRUCT(nmd);
983         nmd.version = 1;
984         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
985         nmd.ctr.ctr1.array = talloc_array(ar,
986                                           struct replPropertyMetaData1,
987                                           nmd.ctr.ctr1.count);
988         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
989
990         /* first copy the old meta data */
991         for (i=0; i < omd.ctr.ctr1.count; i++) {
992                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
993                 ni++;
994         }
995
996         /* now merge in the new meta data */
997         for (i=0; i < rmd->ctr.ctr1.count; i++) {
998                 bool found = false;
999
1000                 rmd->ctr.ctr1.array[i].local_usn = seq_num;
1001
1002                 for (j=0; j < ni; j++) {
1003                         int cmp;
1004
1005                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
1006                                 continue;
1007                         }
1008
1009                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
1010                                                                             &nmd.ctr.ctr1.array[j]);
1011                         if (cmp > 0) {
1012                                 /* replace the entry */
1013                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
1014                                 found = true;
1015                                 break;
1016                         }
1017
1018                         /* we don't want to apply this change so remove the attribute */
1019                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
1020                         removed_attrs++;
1021
1022                         found = true;
1023                         break;
1024                 }
1025
1026                 if (found) continue;
1027
1028                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
1029                 ni++;
1030         }
1031
1032         /*
1033          * finally correct the size of the meta_data array
1034          */
1035         nmd.ctr.ctr1.count = ni;
1036
1037         /*
1038          * the rdn attribute (the alias for the name attribute),
1039          * 'cn' for most objects is the last entry in the meta data array
1040          * we have stored
1041          *
1042          * sort the new meta data array
1043          */
1044         {
1045                 struct replPropertyMetaData1 *rdn_p;
1046                 uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
1047
1048                 rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
1049                 replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_p->attid);
1050         }
1051
1052         /* create the meta data value */
1053         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
1054                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1055                                        &nmd,
1056                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1057         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1058                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1059                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1060         }
1061
1062         /*
1063          * check if some replicated attributes left, otherwise skip the ldb_modify() call
1064          */
1065         if (msg->num_elements == 0) {
1066                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
1067                           ar->index_current);
1068
1069                 ar->index_current++;
1070                 return replmd_replicated_apply_next(ar);
1071         }
1072
1073         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
1074                   ar->index_current, msg->num_elements);
1075
1076         /*
1077          * when we know that we'll modify the record, add the whenChanged, uSNChanged
1078          * and replPopertyMetaData attributes
1079          */
1080         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1081         if (ret != LDB_SUCCESS) {
1082                 return replmd_replicated_request_error(ar, ret);
1083         }
1084         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1085         if (ret != LDB_SUCCESS) {
1086                 return replmd_replicated_request_error(ar, ret);
1087         }
1088         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
1089         if (ret != LDB_SUCCESS) {
1090                 return replmd_replicated_request_error(ar, ret);
1091         }
1092
1093         replmd_ldb_message_sort(msg, ar->schema);
1094
1095         /* we want to replace the old values */
1096         for (i=0; i < msg->num_elements; i++) {
1097                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
1098         }
1099
1100         if (DEBUGLVL(4)) {
1101                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1102                 DEBUG(4, ("DRS replication modify message:\n%s\n", s));
1103                 talloc_free(s);
1104         }
1105
1106         ret = ldb_build_mod_req(&change_req,
1107                                 ldb,
1108                                 ar,
1109                                 msg,
1110                                 ar->controls,
1111                                 ar,
1112                                 replmd_replicated_apply_merge_callback,
1113                                 ar->req);
1114         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1115
1116         return ldb_next_request(ar->module, change_req);
1117 }
1118
1119 static int replmd_replicated_apply_search_callback(struct ldb_request *req,
1120                                                    struct ldb_reply *ares)
1121 {
1122         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1123                                                struct replmd_replicated_request);
1124         int ret;
1125
1126         if (!ares) {
1127                 return ldb_module_done(ar->req, NULL, NULL,
1128                                         LDB_ERR_OPERATIONS_ERROR);
1129         }
1130         if (ares->error != LDB_SUCCESS &&
1131             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1132                 return ldb_module_done(ar->req, ares->controls,
1133                                         ares->response, ares->error);
1134         }
1135
1136         switch (ares->type) {
1137         case LDB_REPLY_ENTRY:
1138                 ar->search_msg = talloc_steal(ar, ares->message);
1139                 break;
1140
1141         case LDB_REPLY_REFERRAL:
1142                 /* we ignore referrals */
1143                 break;
1144
1145         case LDB_REPLY_DONE:
1146                 if (ar->search_msg != NULL) {
1147                         ret = replmd_replicated_apply_merge(ar);
1148                 } else {
1149                         ret = replmd_replicated_apply_add(ar);
1150                 }
1151                 if (ret != LDB_SUCCESS) {
1152                         return ldb_module_done(ar->req, NULL, NULL, ret);
1153                 }
1154         }
1155
1156         talloc_free(ares);
1157         return LDB_SUCCESS;
1158 }
1159
1160 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
1161
1162 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
1163 {
1164         struct ldb_context *ldb;
1165         int ret;
1166         char *tmp_str;
1167         char *filter;
1168         struct ldb_request *search_req;
1169
1170         if (ar->index_current >= ar->objs->num_objects) {
1171                 /* done with it, go to next stage */
1172                 return replmd_replicated_uptodate_vector(ar);
1173         }
1174
1175         ldb = ldb_module_get_ctx(ar->module);
1176         ar->search_msg = NULL;
1177
1178         tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].guid_value);
1179         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1180
1181         filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
1182         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1183         talloc_free(tmp_str);
1184
1185         ret = ldb_build_search_req(&search_req,
1186                                    ldb,
1187                                    ar,
1188                                    ar->objs->partition_dn,
1189                                    LDB_SCOPE_SUBTREE,
1190                                    filter,
1191                                    NULL,
1192                                    NULL,
1193                                    ar,
1194                                    replmd_replicated_apply_search_callback,
1195                                    ar->req);
1196         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1197
1198         return ldb_next_request(ar->module, search_req);
1199 }
1200
1201 static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
1202                                                       struct ldb_reply *ares)
1203 {
1204         struct ldb_context *ldb;
1205         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1206                                                struct replmd_replicated_request);
1207         ldb = ldb_module_get_ctx(ar->module);
1208
1209         if (!ares) {
1210                 return ldb_module_done(ar->req, NULL, NULL,
1211                                         LDB_ERR_OPERATIONS_ERROR);
1212         }
1213         if (ares->error != LDB_SUCCESS) {
1214                 return ldb_module_done(ar->req, ares->controls,
1215                                         ares->response, ares->error);
1216         }
1217
1218         if (ares->type != LDB_REPLY_DONE) {
1219                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1220                 return ldb_module_done(ar->req, NULL, NULL,
1221                                         LDB_ERR_OPERATIONS_ERROR);
1222         }
1223
1224         talloc_free(ares);
1225
1226         return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
1227 }
1228
1229 static int replmd_drsuapi_DsReplicaCursor2_compare(const struct drsuapi_DsReplicaCursor2 *c1,
1230                                                    const struct drsuapi_DsReplicaCursor2 *c2)
1231 {
1232         return GUID_compare(&c1->source_dsa_invocation_id, &c2->source_dsa_invocation_id);
1233 }
1234
1235 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1236 {
1237         struct ldb_context *ldb;
1238         struct ldb_request *change_req;
1239         enum ndr_err_code ndr_err;
1240         struct ldb_message *msg;
1241         struct replUpToDateVectorBlob ouv;
1242         const struct ldb_val *ouv_value;
1243         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1244         struct replUpToDateVectorBlob nuv;
1245         struct ldb_val nuv_value;
1246         struct ldb_message_element *nuv_el = NULL;
1247         const struct GUID *our_invocation_id;
1248         struct ldb_message_element *orf_el = NULL;
1249         struct repsFromToBlob nrf;
1250         struct ldb_val *nrf_value = NULL;
1251         struct ldb_message_element *nrf_el = NULL;
1252         uint32_t i,j,ni=0;
1253         bool found = false;
1254         time_t t = time(NULL);
1255         NTTIME now;
1256         int ret;
1257
1258         ldb = ldb_module_get_ctx(ar->module);
1259         ruv = ar->objs->uptodateness_vector;
1260         ZERO_STRUCT(ouv);
1261         ouv.version = 2;
1262         ZERO_STRUCT(nuv);
1263         nuv.version = 2;
1264
1265         unix_to_nt_time(&now, t);
1266
1267         /*
1268          * first create the new replUpToDateVector
1269          */
1270         ouv_value = ldb_msg_find_ldb_val(ar->search_msg, "replUpToDateVector");
1271         if (ouv_value) {
1272                 ndr_err = ndr_pull_struct_blob(ouv_value, ar,
1273                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &ouv,
1274                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1275                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1276                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1277                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1278                 }
1279
1280                 if (ouv.version != 2) {
1281                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1282                 }
1283         }
1284
1285         /*
1286          * the new uptodateness vector will at least
1287          * contain 1 entry, one for the source_dsa
1288          *
1289          * plus optional values from our old vector and the one from the source_dsa
1290          */
1291         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
1292         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1293         nuv.ctr.ctr2.cursors = talloc_array(ar,
1294                                             struct drsuapi_DsReplicaCursor2,
1295                                             nuv.ctr.ctr2.count);
1296         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1297
1298         /* first copy the old vector */
1299         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1300                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1301                 ni++;
1302         }
1303
1304         /* get our invocation_id if we have one already attached to the ldb */
1305         our_invocation_id = samdb_ntds_invocation_id(ldb);
1306
1307         /* merge in the source_dsa vector is available */
1308         for (i=0; (ruv && i < ruv->count); i++) {
1309                 found = false;
1310
1311                 if (our_invocation_id &&
1312                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1313                                our_invocation_id)) {
1314                         continue;
1315                 }
1316
1317                 for (j=0; j < ni; j++) {
1318                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1319                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1320                                 continue;
1321                         }
1322
1323                         found = true;
1324
1325                         /*
1326                          * we update only the highest_usn and not the latest_sync_success time,
1327                          * because the last success stands for direct replication
1328                          */
1329                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1330                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1331                         }
1332                         break;                  
1333                 }
1334
1335                 if (found) continue;
1336
1337                 /* if it's not there yet, add it */
1338                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1339                 ni++;
1340         }
1341
1342         /*
1343          * merge in the current highwatermark for the source_dsa
1344          */
1345         found = false;
1346         for (j=0; j < ni; j++) {
1347                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1348                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1349                         continue;
1350                 }
1351
1352                 found = true;
1353
1354                 /*
1355                  * here we update the highest_usn and last_sync_success time
1356                  * because we're directly replicating from the source_dsa
1357                  *
1358                  * and use the tmp_highest_usn because this is what we have just applied
1359                  * to our ldb
1360                  */
1361                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1362                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1363                 break;
1364         }
1365         if (!found) {
1366                 /*
1367                  * here we update the highest_usn and last_sync_success time
1368                  * because we're directly replicating from the source_dsa
1369                  *
1370                  * and use the tmp_highest_usn because this is what we have just applied
1371                  * to our ldb
1372                  */
1373                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1374                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1375                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1376                 ni++;
1377         }
1378
1379         /*
1380          * finally correct the size of the cursors array
1381          */
1382         nuv.ctr.ctr2.count = ni;
1383
1384         /*
1385          * sort the cursors
1386          */
1387         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1388               sizeof(struct drsuapi_DsReplicaCursor2),
1389               (comparison_fn_t)replmd_drsuapi_DsReplicaCursor2_compare);
1390
1391         /*
1392          * create the change ldb_message
1393          */
1394         msg = ldb_msg_new(ar);
1395         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1396         msg->dn = ar->search_msg->dn;
1397
1398         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
1399                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1400                                        &nuv,
1401                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1402         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1403                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1404                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1405         }
1406         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1407         if (ret != LDB_SUCCESS) {
1408                 return replmd_replicated_request_error(ar, ret);
1409         }
1410         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1411
1412         /*
1413          * now create the new repsFrom value from the given repsFromTo1 structure
1414          */
1415         ZERO_STRUCT(nrf);
1416         nrf.version                                     = 1;
1417         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1418         /* and fix some values... */
1419         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1420         nrf.ctr.ctr1.last_success                       = now;
1421         nrf.ctr.ctr1.last_attempt                       = now;
1422         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1423         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1424
1425         /*
1426          * first see if we already have a repsFrom value for the current source dsa
1427          * if so we'll later replace this value
1428          */
1429         orf_el = ldb_msg_find_element(ar->search_msg, "repsFrom");
1430         if (orf_el) {
1431                 for (i=0; i < orf_el->num_values; i++) {
1432                         struct repsFromToBlob *trf;
1433
1434                         trf = talloc(ar, struct repsFromToBlob);
1435                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1436
1437                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), trf,
1438                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1439                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1440                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1441                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1442                         }
1443
1444                         if (trf->version != 1) {
1445                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1446                         }
1447
1448                         /*
1449                          * we compare the source dsa objectGUID not the invocation_id
1450                          * because we want only one repsFrom value per source dsa
1451                          * and when the invocation_id of the source dsa has changed we don't need 
1452                          * the old repsFrom with the old invocation_id
1453                          */
1454                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1455                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1456                                 talloc_free(trf);
1457                                 continue;
1458                         }
1459
1460                         talloc_free(trf);
1461                         nrf_value = &orf_el->values[i];
1462                         break;
1463                 }
1464
1465                 /*
1466                  * copy over all old values to the new ldb_message
1467                  */
1468                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1469                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1470                 *nrf_el = *orf_el;
1471         }
1472
1473         /*
1474          * if we haven't found an old repsFrom value for the current source dsa
1475          * we'll add a new value
1476          */
1477         if (!nrf_value) {
1478                 struct ldb_val zero_value;
1479                 ZERO_STRUCT(zero_value);
1480                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1481                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1482
1483                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1484         }
1485
1486         /* we now fill the value which is already attached to ldb_message */
1487         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
1488                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1489                                        &nrf,
1490                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1491         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1492                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1493                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1494         }
1495
1496         /* 
1497          * the ldb_message_element for the attribute, has all the old values and the new one
1498          * so we'll replace the whole attribute with all values
1499          */
1500         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1501
1502         if (DEBUGLVL(4)) {
1503                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1504                 DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
1505                 talloc_free(s);
1506         }
1507
1508         /* prepare the ldb_modify() request */
1509         ret = ldb_build_mod_req(&change_req,
1510                                 ldb,
1511                                 ar,
1512                                 msg,
1513                                 ar->controls,
1514                                 ar,
1515                                 replmd_replicated_uptodate_modify_callback,
1516                                 ar->req);
1517         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1518
1519         return ldb_next_request(ar->module, change_req);
1520 }
1521
1522 static int replmd_replicated_uptodate_search_callback(struct ldb_request *req,
1523                                                       struct ldb_reply *ares)
1524 {
1525         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1526                                                struct replmd_replicated_request);
1527         int ret;
1528
1529         if (!ares) {
1530                 return ldb_module_done(ar->req, NULL, NULL,
1531                                         LDB_ERR_OPERATIONS_ERROR);
1532         }
1533         if (ares->error != LDB_SUCCESS &&
1534             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1535                 return ldb_module_done(ar->req, ares->controls,
1536                                         ares->response, ares->error);
1537         }
1538
1539         switch (ares->type) {
1540         case LDB_REPLY_ENTRY:
1541                 ar->search_msg = talloc_steal(ar, ares->message);
1542                 break;
1543
1544         case LDB_REPLY_REFERRAL:
1545                 /* we ignore referrals */
1546                 break;
1547
1548         case LDB_REPLY_DONE:
1549                 if (ar->search_msg == NULL) {
1550                         ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1551                 } else {
1552                         ret = replmd_replicated_uptodate_modify(ar);
1553                 }
1554                 if (ret != LDB_SUCCESS) {
1555                         return ldb_module_done(ar->req, NULL, NULL, ret);
1556                 }
1557         }
1558
1559         talloc_free(ares);
1560         return LDB_SUCCESS;
1561 }
1562
1563
1564 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1565 {
1566         struct ldb_context *ldb;
1567         int ret;
1568         static const char *attrs[] = {
1569                 "replUpToDateVector",
1570                 "repsFrom",
1571                 NULL
1572         };
1573         struct ldb_request *search_req;
1574
1575         ldb = ldb_module_get_ctx(ar->module);
1576         ar->search_msg = NULL;
1577
1578         ret = ldb_build_search_req(&search_req,
1579                                    ldb,
1580                                    ar,
1581                                    ar->objs->partition_dn,
1582                                    LDB_SCOPE_BASE,
1583                                    "(objectClass=*)",
1584                                    attrs,
1585                                    NULL,
1586                                    ar,
1587                                    replmd_replicated_uptodate_search_callback,
1588                                    ar->req);
1589         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1590
1591         return ldb_next_request(ar->module, search_req);
1592 }
1593
1594
1595
1596 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1597 {
1598         struct ldb_context *ldb;
1599         struct dsdb_extended_replicated_objects *objs;
1600         struct replmd_replicated_request *ar;
1601         struct ldb_control **ctrls;
1602         int ret, i;
1603         struct dsdb_control_current_partition *partition_ctrl;
1604         struct replmd_private *replmd_private = 
1605                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1606
1607         ldb = ldb_module_get_ctx(module);
1608
1609         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1610
1611         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1612         if (!objs) {
1613                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1614                 return LDB_ERR_PROTOCOL_ERROR;
1615         }
1616
1617         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1618                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1619                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1620                 return LDB_ERR_PROTOCOL_ERROR;
1621         }
1622
1623         ar = replmd_ctx_init(module, req);
1624         if (!ar)
1625                 return LDB_ERR_OPERATIONS_ERROR;
1626
1627         ar->objs = objs;
1628         ar->schema = dsdb_get_schema(ldb);
1629         if (!ar->schema) {
1630                 ldb_debug_set(ldb, LDB_DEBUG_FATAL, "replmd_ctx_init: no loaded schema found\n");
1631                 talloc_free(ar);
1632                 return LDB_ERR_CONSTRAINT_VIOLATION;
1633         }
1634
1635         ctrls = req->controls;
1636
1637         if (req->controls) {
1638                 req->controls = talloc_memdup(ar, req->controls,
1639                                               talloc_get_size(req->controls));
1640                 if (!req->controls) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1641         }
1642
1643         ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, NULL);
1644         if (ret != LDB_SUCCESS) {
1645                 return ret;
1646         }
1647
1648         /*
1649           add the DSDB_CONTROL_CURRENT_PARTITION_OID control. This
1650           tells the partition module which partition this request is
1651           directed at. That is important as the partition roots appear
1652           twice in the directory, once as mount points in the top
1653           level store, and once as the roots of each partition. The
1654           replication code wants to operate on the root of the
1655           partitions, not the top level mount points
1656          */
1657         partition_ctrl = talloc(req, struct dsdb_control_current_partition);
1658         if (partition_ctrl == NULL) {
1659                 if (!partition_ctrl) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1660         }
1661         partition_ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
1662         partition_ctrl->dn = objs->partition_dn;
1663
1664         ret = ldb_request_add_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID, false, partition_ctrl);
1665         if (ret != LDB_SUCCESS) {
1666                 return ret;
1667         }
1668
1669         ar->controls = req->controls;
1670         req->controls = ctrls;
1671
1672         DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
1673
1674         /* save away the linked attributes for the end of the
1675            transaction */
1676         for (i=0; i<ar->objs->linked_attributes_count; i++) {
1677                 struct la_entry *la_entry;
1678
1679                 if (replmd_private == NULL) {
1680                         DEBUG(0,(__location__ ": repl_meta_data not called from within a transaction\n"));
1681                         return LDB_ERR_OPERATIONS_ERROR;
1682                 }
1683
1684                 la_entry = talloc(replmd_private, struct la_entry);
1685                 if (la_entry == NULL) {
1686                         ldb_oom(ldb);
1687                         return LDB_ERR_OPERATIONS_ERROR;
1688                 }
1689                 la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
1690                 if (la_entry->la == NULL) {
1691                         ldb_oom(ldb);
1692                         return LDB_ERR_OPERATIONS_ERROR;
1693                 }
1694                 *la_entry->la = ar->objs->linked_attributes[i];
1695
1696                 /* we need to steal the non-scalars so they stay
1697                    around until the end of the transaction */
1698                 talloc_steal(la_entry->la, la_entry->la->identifier);
1699                 talloc_steal(la_entry->la, la_entry->la->value.blob);
1700
1701                 DLIST_ADD(replmd_private->la_list, la_entry);
1702         }
1703
1704         return replmd_replicated_apply_next(ar);
1705 }
1706
1707 /*
1708   process one linked attribute structure
1709  */
1710 static int replmd_process_linked_attribute(struct ldb_module *module,
1711                                            struct la_entry *la_entry)
1712 {                                          
1713         struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
1714         struct ldb_context *ldb = ldb_module_get_ctx(module);
1715         struct drsuapi_DsReplicaObjectIdentifier3 target;
1716         struct ldb_message *msg;
1717         struct ldb_message_element *ret_el;
1718         TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
1719         enum ndr_err_code ndr_err;
1720         char *target_dn;
1721         struct ldb_request *mod_req;
1722         int ret;
1723         const struct dsdb_attribute *attr;
1724
1725 /*
1726 linked_attributes[0]:                                                     
1727      &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute 
1728         identifier               : *                                      
1729             identifier: struct drsuapi_DsReplicaObjectIdentifier          
1730                 __ndr_size               : 0x0000003a (58)                
1731                 __ndr_size_sid           : 0x00000000 (0)                 
1732                 guid                     : 8e95b6a9-13dd-4158-89db-3220a5be5cc7
1733                 sid                      : S-0-0                               
1734                 __ndr_size_dn            : 0x00000000 (0)                      
1735                 dn                       : ''                                  
1736         attid                    : DRSUAPI_ATTRIBUTE_member (0x1F)             
1737         value: struct drsuapi_DsAttributeValue                                 
1738             __ndr_size               : 0x0000007e (126)                        
1739             blob                     : *                                       
1740                 blob                     : DATA_BLOB length=126                
1741         flags                    : 0x00000001 (1)                              
1742                1: DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE                      
1743         originating_add_time     : Wed Sep  2 22:20:01 2009 EST                
1744         meta_data: struct drsuapi_DsReplicaMetaData                            
1745             version                  : 0x00000015 (21)                         
1746             originating_change_time  : Wed Sep  2 23:39:07 2009 EST            
1747             originating_invocation_id: 794640f3-18cf-40ee-a211-a93992b67a64    
1748             originating_usn          : 0x000000000001e19c (123292)             
1749      &target: struct drsuapi_DsReplicaObjectIdentifier3                        
1750         __ndr_size               : 0x0000007e (126)                            
1751         __ndr_size_sid           : 0x0000001c (28)                             
1752         guid                     : 7639e594-db75-4086-b0d4-67890ae46031        
1753         sid                      : S-1-5-21-2848215498-2472035911-1947525656-19924
1754         __ndr_size_dn            : 0x00000022 (34)                                
1755         dn                       : 'CN=UOne,OU=TestOU,DC=vsofs8,DC=com'           
1756  */
1757         if (DEBUGLVL(4)) {
1758                 NDR_PRINT_DEBUG(drsuapi_DsReplicaLinkedAttribute, la); 
1759         }
1760         
1761         /* decode the target of the link */
1762         ndr_err = ndr_pull_struct_blob(la->value.blob, 
1763                                        tmp_ctx, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1764                                        &target,
1765                                        (ndr_pull_flags_fn_t)ndr_pull_drsuapi_DsReplicaObjectIdentifier3);
1766         if (ndr_err != NDR_ERR_SUCCESS) {
1767                 DEBUG(0,("Unable to decode linked_attribute target\n"));
1768                 dump_data(4, la->value.blob->data, la->value.blob->length);                     
1769                 talloc_free(tmp_ctx);
1770                 return LDB_ERR_OPERATIONS_ERROR;
1771         }
1772         if (DEBUGLVL(4)) {
1773                 NDR_PRINT_DEBUG(drsuapi_DsReplicaObjectIdentifier3, &target);
1774         }
1775
1776         /* construct a modify request for this attribute change */
1777         msg = ldb_msg_new(tmp_ctx);
1778         if (!msg) {
1779                 ldb_oom(ldb);
1780                 talloc_free(tmp_ctx);
1781                 return LDB_ERR_OPERATIONS_ERROR;
1782         }
1783
1784         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, 
1785                                    GUID_string(tmp_ctx, &la->identifier->guid), &msg->dn);
1786         if (ret != LDB_SUCCESS) {
1787                 talloc_free(tmp_ctx);
1788                 return ret;
1789         }
1790
1791         /* find the attribute being modified */
1792         attr = dsdb_attribute_by_attributeID_id(dsdb_get_schema(ldb), la->attid);
1793         if (attr == NULL) {
1794                 DEBUG(0, (__location__ ": Unable to find attributeID 0x%x\n", la->attid));
1795                 talloc_free(tmp_ctx);
1796                 return LDB_ERR_OPERATIONS_ERROR;
1797         }
1798
1799         if (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) {
1800                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
1801                                         LDB_FLAG_MOD_ADD, &ret_el);
1802         } else {
1803                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
1804                                         LDB_FLAG_MOD_DELETE, &ret_el);
1805         }
1806         if (ret != LDB_SUCCESS) {
1807                 talloc_free(tmp_ctx);
1808                 return ret;
1809         }
1810         ret_el->values = talloc_array(msg, struct ldb_val, 1);
1811         if (!ret_el->values) {
1812                 ldb_oom(ldb);
1813                 talloc_free(tmp_ctx);
1814                 return LDB_ERR_OPERATIONS_ERROR;
1815         }
1816         ret_el->num_values = 1;
1817
1818         target_dn = talloc_asprintf(tmp_ctx, "<GUID=%s>;<SID=%s>;%s",
1819                                     GUID_string(tmp_ctx, &target.guid),
1820                                     dom_sid_string(tmp_ctx, &target.sid),
1821                                     target.dn);
1822         if (target_dn == NULL) {
1823                 ldb_oom(ldb);
1824                 talloc_free(tmp_ctx);
1825                 return LDB_ERR_OPERATIONS_ERROR;
1826         }
1827         ret_el->values[0] = data_blob_string_const(target_dn);
1828
1829         ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
1830                                 msg,
1831                                 NULL,
1832                                 NULL, 
1833                                 ldb_op_default_callback,
1834                                 NULL);
1835         if (ret != LDB_SUCCESS) {
1836                 talloc_free(tmp_ctx);
1837                 return ret;
1838         }
1839         talloc_steal(mod_req, msg);
1840
1841         if (DEBUGLVL(4)) {
1842                 DEBUG(4,("Applying DRS linked attribute change:\n%s\n",
1843                          ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg)));
1844         }
1845
1846         /* Run the new request */
1847         ret = ldb_next_request(module, mod_req);
1848
1849         /* we need to wait for this to finish, as we are being called
1850            from the synchronous end_transaction hook of this module */
1851         if (ret == LDB_SUCCESS) {
1852                 ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
1853         }
1854
1855         if (ret != LDB_SUCCESS) {
1856                 ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s' %s\n",
1857                           ldb_errstring(ldb),
1858                           ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
1859         }
1860         
1861         talloc_free(tmp_ctx);
1862
1863         return ret;     
1864 }
1865
1866 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
1867 {
1868         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
1869                 return replmd_extended_replicated_objects(module, req);
1870         }
1871
1872         return ldb_next_request(module, req);
1873 }
1874
1875
1876 /*
1877   we hook into the transaction operations to allow us to 
1878   perform the linked attribute updates at the end of the whole
1879   transaction. This allows a forward linked attribute to be created
1880   before the object is created. During a vampire, w2k8 sends us linked
1881   attributes before the objects they are part of.
1882  */
1883 static int replmd_start_transaction(struct ldb_module *module)
1884 {
1885         /* create our private structure for this transaction */
1886         struct replmd_private *replmd_private = talloc_get_type(ldb_module_get_private(module),
1887                                                                 struct replmd_private);
1888         talloc_free(replmd_private);
1889         replmd_private = talloc(module, struct replmd_private);
1890         if (replmd_private == NULL) {
1891                 return LDB_ERR_OPERATIONS_ERROR;
1892         }
1893         replmd_private->la_list = NULL;
1894         ldb_module_set_private(module, replmd_private);
1895         return ldb_next_start_trans(module);
1896 }
1897
1898 /*
1899   on prepare commit we loop over our queued la_context structures and
1900   apply each of them  
1901  */
1902 static int replmd_prepare_commit(struct ldb_module *module)
1903 {
1904         struct replmd_private *replmd_private = 
1905                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1906         struct la_entry *la;
1907
1908         /* walk the list backwards, to do the first entry first, as we
1909          * added the entries with DLIST_ADD() which puts them at the
1910          * start of the list */
1911         for (la = replmd_private->la_list; la && la->next; la=la->next) ;
1912
1913         for (; la; la=la->prev) {
1914                 int ret;
1915                 ret = replmd_process_linked_attribute(module, la);
1916                 if (ret != LDB_SUCCESS) {
1917                         return ret;
1918                 }
1919         }
1920
1921         talloc_free(replmd_private);
1922         ldb_module_set_private(module, NULL);
1923         
1924         return ldb_next_prepare_commit(module);
1925 }
1926
1927 static int replmd_del_transaction(struct ldb_module *module)
1928 {
1929         struct replmd_private *replmd_private = 
1930                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1931         talloc_free(replmd_private);
1932         ldb_module_set_private(module, NULL);
1933         return ldb_next_del_trans(module);
1934 }
1935
1936
1937 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
1938         .name          = "repl_meta_data",
1939         .add           = replmd_add,
1940         .modify        = replmd_modify,
1941         .extended      = replmd_extended,
1942         .start_transaction = replmd_start_transaction,
1943         .prepare_commit    = replmd_prepare_commit,
1944         .del_transaction   = replmd_del_transaction,
1945 };