Merge branch 'master' of /home/tridge/samba/git/combined
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2008
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12    
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb
29  *
30  *  Component: ldb repl_meta_data module
31  *
32  *  Description: - add a unique objectGUID onto every new record,
33  *               - handle whenCreated, whenChanged timestamps
34  *               - handle uSNCreated, uSNChanged numbers
35  *               - handle replPropertyMetaData attribute
36  *
37  *  Author: Simo Sorce
38  *  Author: Stefan Metzmacher
39  */
40
41 #include "includes.h"
42 #include "ldb_module.h"
43 #include "dsdb/samdb/samdb.h"
44 #include "dsdb/common/proto.h"
45 #include "../libds/common/flags.h"
46 #include "librpc/gen_ndr/ndr_misc.h"
47 #include "librpc/gen_ndr/ndr_drsuapi.h"
48 #include "librpc/gen_ndr/ndr_drsblobs.h"
49 #include "param/param.h"
50 #include "libcli/security/dom_sid.h"
51 #include "lib/util/dlinklist.h"
52
53 struct replmd_private {
54         struct la_entry *la_list;
55 };
56
57 struct la_entry {
58         struct la_entry *next, *prev;
59         struct drsuapi_DsReplicaLinkedAttribute *la;
60 };
61
62 struct replmd_replicated_request {
63         struct ldb_module *module;
64         struct ldb_request *req;
65
66         const struct dsdb_schema *schema;
67
68         struct dsdb_extended_replicated_objects *objs;
69
70         /* the controls we pass down */
71         struct ldb_control **controls;
72
73         uint32_t index_current;
74
75         struct ldb_message *search_msg;
76 };
77
78 /*
79   created a replmd_replicated_request context
80  */
81 static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *module,
82                                                          struct ldb_request *req)
83 {
84         struct ldb_context *ldb;
85         struct replmd_replicated_request *ac;
86
87         ldb = ldb_module_get_ctx(module);
88
89         ac = talloc_zero(req, struct replmd_replicated_request);
90         if (ac == NULL) {
91                 ldb_oom(ldb);
92                 return NULL;
93         }
94
95         ac->module = module;
96         ac->req = req;
97         return ac;
98 }
99
100 /*
101   add a time element to a record
102 */
103 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
104 {
105         struct ldb_message_element *el;
106         char *s;
107
108         if (ldb_msg_find_element(msg, attr) != NULL) {
109                 return LDB_SUCCESS;
110         }
111
112         s = ldb_timestring(msg, t);
113         if (s == NULL) {
114                 return LDB_ERR_OPERATIONS_ERROR;
115         }
116
117         if (ldb_msg_add_string(msg, attr, s) != LDB_SUCCESS) {
118                 return LDB_ERR_OPERATIONS_ERROR;
119         }
120
121         el = ldb_msg_find_element(msg, attr);
122         /* always set as replace. This works because on add ops, the flag
123            is ignored */
124         el->flags = LDB_FLAG_MOD_REPLACE;
125
126         return LDB_SUCCESS;
127 }
128
129 /*
130   add a uint64_t element to a record
131 */
132 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
133 {
134         struct ldb_message_element *el;
135
136         if (ldb_msg_find_element(msg, attr) != NULL) {
137                 return LDB_SUCCESS;
138         }
139
140         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != LDB_SUCCESS) {
141                 return LDB_ERR_OPERATIONS_ERROR;
142         }
143
144         el = ldb_msg_find_element(msg, attr);
145         /* always set as replace. This works because on add ops, the flag
146            is ignored */
147         el->flags = LDB_FLAG_MOD_REPLACE;
148
149         return LDB_SUCCESS;
150 }
151
152 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
153                                                    const struct replPropertyMetaData1 *m2,
154                                                    const uint32_t *rdn_attid)
155 {
156         if (m1->attid == m2->attid) {
157                 return 0;
158         }
159
160         /*
161          * the rdn attribute should be at the end!
162          * so we need to return a value greater than zero
163          * which means m1 is greater than m2
164          */
165         if (m1->attid == *rdn_attid) {
166                 return 1;
167         }
168
169         /*
170          * the rdn attribute should be at the end!
171          * so we need to return a value less than zero
172          * which means m2 is greater than m1
173          */
174         if (m2->attid == *rdn_attid) {
175                 return -1;
176         }
177
178         return m1->attid - m2->attid;
179 }
180
181 static void replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
182                                                  const uint32_t *rdn_attid)
183 {
184         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
185                   discard_const_p(void, rdn_attid), (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
186 }
187
188 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
189                                                  const struct ldb_message_element *e2,
190                                                  const struct dsdb_schema *schema)
191 {
192         const struct dsdb_attribute *a1;
193         const struct dsdb_attribute *a2;
194
195         /* 
196          * TODO: make this faster by caching the dsdb_attribute pointer
197          *       on the ldb_messag_element
198          */
199
200         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
201         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
202
203         /*
204          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
205          *       in the schema
206          */
207         if (!a1 || !a2) {
208                 return strcasecmp(e1->name, e2->name);
209         }
210
211         return a1->attributeID_id - a2->attributeID_id;
212 }
213
214 static void replmd_ldb_message_sort(struct ldb_message *msg,
215                                     const struct dsdb_schema *schema)
216 {
217         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
218                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
219 }
220
221 static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
222 {
223         struct ldb_context *ldb;
224         struct replmd_replicated_request *ac;
225
226         ac = talloc_get_type(req->context, struct replmd_replicated_request);
227         ldb = ldb_module_get_ctx(ac->module);
228
229         if (!ares) {
230                 return ldb_module_done(ac->req, NULL, NULL,
231                                         LDB_ERR_OPERATIONS_ERROR);
232         }
233         if (ares->error != LDB_SUCCESS) {
234                 return ldb_module_done(ac->req, ares->controls,
235                                         ares->response, ares->error);
236         }
237
238         if (ares->type != LDB_REPLY_DONE) {
239                 ldb_set_errstring(ldb,
240                                   "invalid ldb_reply_type in callback");
241                 talloc_free(ares);
242                 return ldb_module_done(ac->req, NULL, NULL,
243                                         LDB_ERR_OPERATIONS_ERROR);
244         }
245
246         return ldb_module_done(ac->req, ares->controls,
247                                 ares->response, LDB_SUCCESS);
248 }
249
250 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
251 {
252         struct ldb_context *ldb;
253         struct replmd_replicated_request *ac;
254         const struct dsdb_schema *schema;
255         enum ndr_err_code ndr_err;
256         struct ldb_request *down_req;
257         struct ldb_message *msg;
258         const struct dsdb_attribute *rdn_attr = NULL;
259         struct GUID guid;
260         struct ldb_val guid_value;
261         struct replPropertyMetaDataBlob nmd;
262         struct ldb_val nmd_value;
263         uint64_t seq_num;
264         const struct GUID *our_invocation_id;
265         time_t t = time(NULL);
266         NTTIME now;
267         char *time_str;
268         int ret;
269         uint32_t i, ni=0;
270
271         /* do not manipulate our control entries */
272         if (ldb_dn_is_special(req->op.add.message->dn)) {
273                 return ldb_next_request(module, req);
274         }
275
276         ldb = ldb_module_get_ctx(module);
277
278         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
279
280         schema = dsdb_get_schema(ldb);
281         if (!schema) {
282                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
283                               "replmd_add: no dsdb_schema loaded");
284                 return LDB_ERR_CONSTRAINT_VIOLATION;
285         }
286
287         ac = replmd_ctx_init(module, req);
288         if (!ac) {
289                 return LDB_ERR_OPERATIONS_ERROR;
290         }
291
292         ac->schema = schema;
293
294         if (ldb_msg_find_element(req->op.add.message, "objectGUID") != NULL) {
295                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
296                               "replmd_add: it's not allowed to add an object with objectGUID\n");
297                 return LDB_ERR_UNWILLING_TO_PERFORM;
298         }
299
300         /* Get a sequence number from the backend */
301         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
302         if (ret != LDB_SUCCESS) {
303                 return ret;
304         }
305
306         /* a new GUID */
307         guid = GUID_random();
308
309         /* get our invocationId */
310         our_invocation_id = samdb_ntds_invocation_id(ldb);
311         if (!our_invocation_id) {
312                 ldb_debug_set(ldb, LDB_DEBUG_ERROR,
313                               "replmd_add: unable to find invocationId\n");
314                 return LDB_ERR_OPERATIONS_ERROR;
315         }
316
317         /* we have to copy the message as the caller might have it as a const */
318         msg = ldb_msg_copy_shallow(ac, req->op.add.message);
319         if (msg == NULL) {
320                 ldb_oom(ldb);
321                 return LDB_ERR_OPERATIONS_ERROR;
322         }
323
324         /* generated times */
325         unix_to_nt_time(&now, t);
326         time_str = ldb_timestring(msg, t);
327         if (!time_str) {
328                 return LDB_ERR_OPERATIONS_ERROR;
329         }
330
331         /* 
332          * remove autogenerated attributes
333          */
334         ldb_msg_remove_attr(msg, "whenCreated");
335         ldb_msg_remove_attr(msg, "whenChanged");
336         ldb_msg_remove_attr(msg, "uSNCreated");
337         ldb_msg_remove_attr(msg, "uSNChanged");
338         ldb_msg_remove_attr(msg, "replPropertyMetaData");
339
340         /*
341          * readd replicated attributes
342          */
343         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
344         if (ret != LDB_SUCCESS) {
345                 ldb_oom(ldb);
346                 return LDB_ERR_OPERATIONS_ERROR;
347         }
348
349         /* build the replication meta_data */
350         ZERO_STRUCT(nmd);
351         nmd.version             = 1;
352         nmd.ctr.ctr1.count      = msg->num_elements;
353         nmd.ctr.ctr1.array      = talloc_array(msg,
354                                                struct replPropertyMetaData1,
355                                                nmd.ctr.ctr1.count);
356         if (!nmd.ctr.ctr1.array) {
357                 ldb_oom(ldb);
358                 return LDB_ERR_OPERATIONS_ERROR;
359         }
360
361         for (i=0; i < msg->num_elements; i++) {
362                 struct ldb_message_element *e = &msg->elements[i];
363                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
364                 const struct dsdb_attribute *sa;
365
366                 if (e->name[0] == '@') continue;
367
368                 sa = dsdb_attribute_by_lDAPDisplayName(schema, e->name);
369                 if (!sa) {
370                         ldb_debug_set(ldb, LDB_DEBUG_ERROR,
371                                       "replmd_add: attribute '%s' not defined in schema\n",
372                                       e->name);
373                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
374                 }
375
376                 if ((sa->systemFlags & 0x00000001) || (sa->systemFlags & 0x00000004)) {
377                         /* if the attribute is not replicated (0x00000001)
378                          * or constructed (0x00000004) it has no metadata
379                          */
380                         continue;
381                 }
382
383                 m->attid                        = sa->attributeID_id;
384                 m->version                      = 1;
385                 m->originating_change_time      = now;
386                 m->originating_invocation_id    = *our_invocation_id;
387                 m->originating_usn              = seq_num;
388                 m->local_usn                    = seq_num;
389                 ni++;
390
391                 if (ldb_attr_cmp(e->name, ldb_dn_get_rdn_name(msg->dn))) {
392                         rdn_attr = sa;
393                 }
394         }
395
396         /* fix meta data count */
397         nmd.ctr.ctr1.count = ni;
398
399         /*
400          * sort meta data array, and move the rdn attribute entry to the end
401          */
402         replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_attr->attributeID_id);
403
404         /* generated NDR encoded values */
405         ndr_err = ndr_push_struct_blob(&guid_value, msg, 
406                                        NULL,
407                                        &guid,
408                                        (ndr_push_flags_fn_t)ndr_push_GUID);
409         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
410                 ldb_oom(ldb);
411                 return LDB_ERR_OPERATIONS_ERROR;
412         }
413         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
414                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
415                                        &nmd,
416                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
417         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
418                 ldb_oom(ldb);
419                 return LDB_ERR_OPERATIONS_ERROR;
420         }
421
422         /*
423          * add the autogenerated values
424          */
425         ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
426         if (ret != LDB_SUCCESS) {
427                 ldb_oom(ldb);
428                 return LDB_ERR_OPERATIONS_ERROR;
429         }
430         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
431         if (ret != LDB_SUCCESS) {
432                 ldb_oom(ldb);
433                 return LDB_ERR_OPERATIONS_ERROR;
434         }
435         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
436         if (ret != LDB_SUCCESS) {
437                 ldb_oom(ldb);
438                 return LDB_ERR_OPERATIONS_ERROR;
439         }
440         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
441         if (ret != LDB_SUCCESS) {
442                 ldb_oom(ldb);
443                 return LDB_ERR_OPERATIONS_ERROR;
444         }
445         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
446         if (ret != LDB_SUCCESS) {
447                 ldb_oom(ldb);
448                 return LDB_ERR_OPERATIONS_ERROR;
449         }
450
451         /*
452          * sort the attributes by attid before storing the object
453          */
454         replmd_ldb_message_sort(msg, schema);
455
456         ret = ldb_build_add_req(&down_req, ldb, ac,
457                                 msg,
458                                 req->controls,
459                                 ac, replmd_op_callback,
460                                 req);
461         if (ret != LDB_SUCCESS) {
462                 return ret;
463         }
464
465         /* go on with the call chain */
466         return ldb_next_request(module, down_req);
467 }
468
469
470 /*
471  * update the replPropertyMetaData for one element
472  */
473 static int replmd_update_rpmd_element(struct ldb_context *ldb, 
474                                       struct ldb_message *msg,
475                                       struct ldb_message_element *el,
476                                       struct replPropertyMetaDataBlob *omd,
477                                       struct dsdb_schema *schema,
478                                       uint64_t *seq_num,
479                                       const struct GUID *our_invocation_id,
480                                       NTTIME now)
481 {
482         int i;
483         const struct dsdb_attribute *a;
484         struct replPropertyMetaData1 *md1;
485
486         a = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
487         if (a == NULL) {
488                 DEBUG(0,(__location__ ": Unable to find attribute %s in schema\n",
489                          el->name));
490                 return LDB_ERR_OPERATIONS_ERROR;
491         }
492
493         if ((a->systemFlags & 0x00000001) || (a->systemFlags & 0x00000004)) {
494                 /* if the attribute is not replicated (0x00000001)
495                  * or constructed (0x00000004) it has no metadata
496                  */
497                 return LDB_SUCCESS;
498         }
499
500         for (i=0; i<omd->ctr.ctr1.count; i++) {
501                 if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) break;
502         }
503         if (i == omd->ctr.ctr1.count) {
504                 /* we need to add a new one */
505                 omd->ctr.ctr1.array = talloc_realloc(msg, omd->ctr.ctr1.array, 
506                                                      struct replPropertyMetaData1, omd->ctr.ctr1.count+1);
507                 if (omd->ctr.ctr1.array == NULL) {
508                         ldb_oom(ldb);
509                         return LDB_ERR_OPERATIONS_ERROR;
510                 }
511                 omd->ctr.ctr1.count++;
512         }
513
514         /* Get a new sequence number from the backend. We only do this
515          * if we have a change that requires a new
516          * replPropertyMetaData element 
517          */
518         if (*seq_num == 0) {
519                 int ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num);
520                 if (ret != LDB_SUCCESS) {
521                         return LDB_ERR_OPERATIONS_ERROR;
522                 }
523         }
524
525         md1 = &omd->ctr.ctr1.array[i];
526         md1->version                   = 1;
527         md1->attid                     = a->attributeID_id;
528         md1->originating_change_time   = now;
529         md1->originating_invocation_id = *our_invocation_id;
530         md1->originating_usn           = *seq_num;
531         md1->local_usn                 = *seq_num;
532         
533         return LDB_SUCCESS;
534 }
535
536 /*
537  * update the replPropertyMetaData object each time we modify an
538  * object. This is needed for DRS replication, as the merge on the
539  * client is based on this object 
540  */
541 static int replmd_update_rpmd(struct ldb_context *ldb, struct ldb_message *msg,
542                               uint64_t *seq_num)
543 {
544         const struct ldb_val *omd_value;
545         enum ndr_err_code ndr_err;
546         struct replPropertyMetaDataBlob omd;
547         int i;
548         struct dsdb_schema *schema;
549         time_t t = time(NULL);
550         NTTIME now;
551         const struct GUID *our_invocation_id;
552         int ret;
553         const char *attrs[] = { "replPropertyMetaData" , NULL };
554         struct ldb_result *res;
555
556         our_invocation_id = samdb_ntds_invocation_id(ldb);
557         if (!our_invocation_id) {
558                 /* this happens during an initial vampire while
559                    updating the schema */
560                 DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
561                 return LDB_SUCCESS;
562         }
563
564         unix_to_nt_time(&now, t);
565
566         /* search for the existing replPropertyMetaDataBlob */
567         ret = ldb_search(ldb, msg, &res, msg->dn, LDB_SCOPE_BASE, attrs, NULL);
568         if (ret != LDB_SUCCESS || res->count < 1) {
569                 DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
570                          ldb_dn_get_linearized(msg->dn)));
571                 return LDB_ERR_OPERATIONS_ERROR;
572         }
573                 
574
575         omd_value = ldb_msg_find_ldb_val(res->msgs[0], "replPropertyMetaData");
576         if (!omd_value) {
577                 DEBUG(0,(__location__ ": Object %s does not have a replPropertyMetaData attribute\n",
578                          ldb_dn_get_linearized(msg->dn)));
579                 return LDB_ERR_OPERATIONS_ERROR;
580         }
581
582         ndr_err = ndr_pull_struct_blob(omd_value, msg,
583                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
584                                        (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
585         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
586                 DEBUG(0,(__location__ ": Failed to parse replPropertyMetaData for %s\n",
587                          ldb_dn_get_linearized(msg->dn)));
588                 return LDB_ERR_OPERATIONS_ERROR;
589         }
590
591         if (omd.version != 1) {
592                 DEBUG(0,(__location__ ": bad version %u in replPropertyMetaData for %s\n",
593                          omd.version, ldb_dn_get_linearized(msg->dn)));
594                 return LDB_ERR_OPERATIONS_ERROR;
595         }
596
597         schema = dsdb_get_schema(ldb);
598
599         for (i=0; i<msg->num_elements; i++) {
600                 ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], &omd, schema, seq_num, 
601                                                  our_invocation_id, now);
602                 if (ret != LDB_SUCCESS) {
603                         return ret;
604                 }
605         }
606
607         /*
608          * replmd_update_rpmd_element has done an update if the
609          * seq_num is set
610          */
611         if (*seq_num != 0) {
612                 struct ldb_val *md_value;
613                 struct ldb_message_element *el;
614
615                 md_value = talloc(msg, struct ldb_val);
616                 if (md_value == NULL) {
617                         ldb_oom(ldb);
618                         return LDB_ERR_OPERATIONS_ERROR;
619                 }
620
621                 ndr_err = ndr_push_struct_blob(md_value, msg, 
622                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
623                                                &omd,
624                                                (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
625                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
626                         DEBUG(0,(__location__ ": Failed to marshall replPropertyMetaData for %s\n",
627                                  ldb_dn_get_linearized(msg->dn)));
628                         return LDB_ERR_OPERATIONS_ERROR;
629                 }
630
631                 ret = ldb_msg_add_empty(msg, "replPropertyMetaData", LDB_FLAG_MOD_REPLACE, &el);
632                 if (ret != LDB_SUCCESS) {
633                         DEBUG(0,(__location__ ": Failed to add updated replPropertyMetaData %s\n",
634                                  ldb_dn_get_linearized(msg->dn)));
635                         return ret;
636                 }
637
638                 el->num_values = 1;
639                 el->values = md_value;
640         }
641
642         return LDB_SUCCESS;     
643 }
644
645
646 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
647 {
648         struct ldb_context *ldb;
649         struct replmd_replicated_request *ac;
650         const struct dsdb_schema *schema;
651         struct ldb_request *down_req;
652         struct ldb_message *msg;
653         int ret;
654         time_t t = time(NULL);
655         uint64_t seq_num = 0;
656
657         /* do not manipulate our control entries */
658         if (ldb_dn_is_special(req->op.mod.message->dn)) {
659                 return ldb_next_request(module, req);
660         }
661
662         ldb = ldb_module_get_ctx(module);
663
664         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
665
666         schema = dsdb_get_schema(ldb);
667         if (!schema) {
668                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
669                               "replmd_modify: no dsdb_schema loaded");
670                 return LDB_ERR_CONSTRAINT_VIOLATION;
671         }
672
673         ac = replmd_ctx_init(module, req);
674         if (!ac) {
675                 return LDB_ERR_OPERATIONS_ERROR;
676         }
677
678         ac->schema = schema;
679
680         /* we have to copy the message as the caller might have it as a const */
681         msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
682         if (msg == NULL) {
683                 talloc_free(ac);
684                 return LDB_ERR_OPERATIONS_ERROR;
685         }
686
687         /* TODO:
688          * - get the whole old object
689          * - if the old object doesn't exist report an error
690          * - give an error when a readonly attribute should
691          *   be modified
692          * - merge the changed into the old object
693          *   if the caller set values to the same value
694          *   ignore the attribute, return success when no
695          *   attribute was changed
696          */
697
698         ret = replmd_update_rpmd(ldb, msg, &seq_num);
699         if (ret != LDB_SUCCESS) {
700                 return ret;
701         }
702
703         /* TODO:
704          * - sort the attributes by attid with replmd_ldb_message_sort()
705          * - replace the old object with the newly constructed one
706          */
707
708         ret = ldb_build_mod_req(&down_req, ldb, ac,
709                                 msg,
710                                 req->controls,
711                                 ac, replmd_op_callback,
712                                 req);
713         if (ret != LDB_SUCCESS) {
714                 return ret;
715         }
716         talloc_steal(down_req, msg);
717
718         /* we only change whenChanged and uSNChanged if the seq_num
719            has changed */
720         if (seq_num != 0) {
721                 if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
722                         talloc_free(ac);
723                         return LDB_ERR_OPERATIONS_ERROR;
724                 }
725
726                 if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
727                         talloc_free(ac);
728                         return LDB_ERR_OPERATIONS_ERROR;
729                 }
730         }
731
732         /* go on with the call chain */
733         return ldb_next_request(module, down_req);
734 }
735
736 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
737 {
738         return ret;
739 }
740
741 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
742 {
743         int ret = LDB_ERR_OTHER;
744         /* TODO: do some error mapping */
745         return ret;
746 }
747
748 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
749
750 static int replmd_replicated_apply_add_callback(struct ldb_request *req,
751                                                 struct ldb_reply *ares)
752 {
753         struct ldb_context *ldb;
754         struct replmd_replicated_request *ar = talloc_get_type(req->context,
755                                                struct replmd_replicated_request);
756         int ret;
757
758         ldb = ldb_module_get_ctx(ar->module);
759
760         if (!ares) {
761                 return ldb_module_done(ar->req, NULL, NULL,
762                                         LDB_ERR_OPERATIONS_ERROR);
763         }
764         if (ares->error != LDB_SUCCESS) {
765                 return ldb_module_done(ar->req, ares->controls,
766                                         ares->response, ares->error);
767         }
768
769         if (ares->type != LDB_REPLY_DONE) {
770                 ldb_set_errstring(ldb, "Invalid reply type\n!");
771                 return ldb_module_done(ar->req, NULL, NULL,
772                                         LDB_ERR_OPERATIONS_ERROR);
773         }
774
775         talloc_free(ares);
776         ar->index_current++;
777
778         ret = replmd_replicated_apply_next(ar);
779         if (ret != LDB_SUCCESS) {
780                 return ldb_module_done(ar->req, NULL, NULL, ret);
781         }
782
783         return LDB_SUCCESS;
784 }
785
786 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
787 {
788         struct ldb_context *ldb;
789         struct ldb_request *change_req;
790         enum ndr_err_code ndr_err;
791         struct ldb_message *msg;
792         struct replPropertyMetaDataBlob *md;
793         struct ldb_val md_value;
794         uint32_t i;
795         uint64_t seq_num;
796         int ret;
797
798         /*
799          * TODO: check if the parent object exist
800          */
801
802         /*
803          * TODO: handle the conflict case where an object with the
804          *       same name exist
805          */
806
807         ldb = ldb_module_get_ctx(ar->module);
808         msg = ar->objs->objects[ar->index_current].msg;
809         md = ar->objs->objects[ar->index_current].meta_data;
810
811         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
812         if (ret != LDB_SUCCESS) {
813                 return replmd_replicated_request_error(ar, ret);
814         }
815
816         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
817         if (ret != LDB_SUCCESS) {
818                 return replmd_replicated_request_error(ar, ret);
819         }
820
821         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
822         if (ret != LDB_SUCCESS) {
823                 return replmd_replicated_request_error(ar, ret);
824         }
825
826         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
827         if (ret != LDB_SUCCESS) {
828                 return replmd_replicated_request_error(ar, ret);
829         }
830
831         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
832         if (ret != LDB_SUCCESS) {
833                 return replmd_replicated_request_error(ar, ret);
834         }
835
836         /*
837          * the meta data array is already sorted by the caller
838          */
839         for (i=0; i < md->ctr.ctr1.count; i++) {
840                 md->ctr.ctr1.array[i].local_usn = seq_num;
841         }
842         ndr_err = ndr_push_struct_blob(&md_value, msg, 
843                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
844                                        md,
845                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
846         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
847                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
848                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
849         }
850         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
851         if (ret != LDB_SUCCESS) {
852                 return replmd_replicated_request_error(ar, ret);
853         }
854
855         replmd_ldb_message_sort(msg, ar->schema);
856
857         if (DEBUGLVL(4)) {
858                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
859                 DEBUG(4, ("DRS replication add message:\n%s\n", s));
860                 talloc_free(s);
861         }
862
863         ret = ldb_build_add_req(&change_req,
864                                 ldb,
865                                 ar,
866                                 msg,
867                                 ar->controls,
868                                 ar,
869                                 replmd_replicated_apply_add_callback,
870                                 ar->req);
871         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
872
873         return ldb_next_request(ar->module, change_req);
874 }
875
876 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
877                                                          struct replPropertyMetaData1 *m2)
878 {
879         int ret;
880
881         if (m1->version != m2->version) {
882                 return m1->version - m2->version;
883         }
884
885         if (m1->originating_change_time != m2->originating_change_time) {
886                 return m1->originating_change_time - m2->originating_change_time;
887         }
888
889         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
890         if (ret != 0) {
891                 return ret;
892         }
893
894         return m1->originating_usn - m2->originating_usn;
895 }
896
897 static int replmd_replicated_apply_merge_callback(struct ldb_request *req,
898                                                   struct ldb_reply *ares)
899 {
900         struct ldb_context *ldb;
901         struct replmd_replicated_request *ar = talloc_get_type(req->context,
902                                                struct replmd_replicated_request);
903         int ret;
904
905         ldb = ldb_module_get_ctx(ar->module);
906
907         if (!ares) {
908                 return ldb_module_done(ar->req, NULL, NULL,
909                                         LDB_ERR_OPERATIONS_ERROR);
910         }
911         if (ares->error != LDB_SUCCESS) {
912                 return ldb_module_done(ar->req, ares->controls,
913                                         ares->response, ares->error);
914         }
915
916         if (ares->type != LDB_REPLY_DONE) {
917                 ldb_set_errstring(ldb, "Invalid reply type\n!");
918                 return ldb_module_done(ar->req, NULL, NULL,
919                                         LDB_ERR_OPERATIONS_ERROR);
920         }
921
922         talloc_free(ares);
923         ar->index_current++;
924
925         ret = replmd_replicated_apply_next(ar);
926         if (ret != LDB_SUCCESS) {
927                 return ldb_module_done(ar->req, NULL, NULL, ret);
928         }
929
930         return LDB_SUCCESS;
931 }
932
933 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
934 {
935         struct ldb_context *ldb;
936         struct ldb_request *change_req;
937         enum ndr_err_code ndr_err;
938         struct ldb_message *msg;
939         struct replPropertyMetaDataBlob *rmd;
940         struct replPropertyMetaDataBlob omd;
941         const struct ldb_val *omd_value;
942         struct replPropertyMetaDataBlob nmd;
943         struct ldb_val nmd_value;
944         uint32_t i,j,ni=0;
945         uint32_t removed_attrs = 0;
946         uint64_t seq_num;
947         int ret;
948
949         ldb = ldb_module_get_ctx(ar->module);
950         msg = ar->objs->objects[ar->index_current].msg;
951         rmd = ar->objs->objects[ar->index_current].meta_data;
952         ZERO_STRUCT(omd);
953         omd.version = 1;
954
955         /*
956          * TODO: check repl data is correct after a rename
957          */
958         if (ldb_dn_compare(msg->dn, ar->search_msg->dn) != 0) {
959                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_request rename %s => %s\n",
960                           ldb_dn_get_linearized(ar->search_msg->dn),
961                           ldb_dn_get_linearized(msg->dn));
962                 if (ldb_rename(ldb, ar->search_msg->dn, msg->dn) != LDB_SUCCESS) {
963                         ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_replicated_request rename %s => %s failed - %s\n",
964                                   ldb_dn_get_linearized(ar->search_msg->dn),
965                                   ldb_dn_get_linearized(msg->dn),
966                                   ldb_errstring(ldb));
967                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_DB_ERROR);
968                 }
969         }
970
971         ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
972         if (ret != LDB_SUCCESS) {
973                 return replmd_replicated_request_error(ar, ret);
974         }
975
976         /* find existing meta data */
977         omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
978         if (omd_value) {
979                 ndr_err = ndr_pull_struct_blob(omd_value, ar,
980                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &omd,
981                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
982                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
983                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
984                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
985                 }
986
987                 if (omd.version != 1) {
988                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
989                 }
990         }
991
992         ZERO_STRUCT(nmd);
993         nmd.version = 1;
994         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
995         nmd.ctr.ctr1.array = talloc_array(ar,
996                                           struct replPropertyMetaData1,
997                                           nmd.ctr.ctr1.count);
998         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
999
1000         /* first copy the old meta data */
1001         for (i=0; i < omd.ctr.ctr1.count; i++) {
1002                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
1003                 ni++;
1004         }
1005
1006         /* now merge in the new meta data */
1007         for (i=0; i < rmd->ctr.ctr1.count; i++) {
1008                 bool found = false;
1009
1010                 rmd->ctr.ctr1.array[i].local_usn = seq_num;
1011
1012                 for (j=0; j < ni; j++) {
1013                         int cmp;
1014
1015                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
1016                                 continue;
1017                         }
1018
1019                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
1020                                                                             &nmd.ctr.ctr1.array[j]);
1021                         if (cmp > 0) {
1022                                 /* replace the entry */
1023                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
1024                                 found = true;
1025                                 break;
1026                         }
1027
1028                         /* we don't want to apply this change so remove the attribute */
1029                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
1030                         removed_attrs++;
1031
1032                         found = true;
1033                         break;
1034                 }
1035
1036                 if (found) continue;
1037
1038                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
1039                 ni++;
1040         }
1041
1042         /*
1043          * finally correct the size of the meta_data array
1044          */
1045         nmd.ctr.ctr1.count = ni;
1046
1047         /*
1048          * the rdn attribute (the alias for the name attribute),
1049          * 'cn' for most objects is the last entry in the meta data array
1050          * we have stored
1051          *
1052          * sort the new meta data array
1053          */
1054         {
1055                 struct replPropertyMetaData1 *rdn_p;
1056                 uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
1057
1058                 rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
1059                 replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_p->attid);
1060         }
1061
1062         /* create the meta data value */
1063         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
1064                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1065                                        &nmd,
1066                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
1067         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1068                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1069                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1070         }
1071
1072         /*
1073          * check if some replicated attributes left, otherwise skip the ldb_modify() call
1074          */
1075         if (msg->num_elements == 0) {
1076                 ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
1077                           ar->index_current);
1078
1079                 ar->index_current++;
1080                 return replmd_replicated_apply_next(ar);
1081         }
1082
1083         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
1084                   ar->index_current, msg->num_elements);
1085
1086         /*
1087          * when we know that we'll modify the record, add the whenChanged, uSNChanged
1088          * and replPopertyMetaData attributes
1089          */
1090         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
1091         if (ret != LDB_SUCCESS) {
1092                 return replmd_replicated_request_error(ar, ret);
1093         }
1094         ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
1095         if (ret != LDB_SUCCESS) {
1096                 return replmd_replicated_request_error(ar, ret);
1097         }
1098         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
1099         if (ret != LDB_SUCCESS) {
1100                 return replmd_replicated_request_error(ar, ret);
1101         }
1102
1103         replmd_ldb_message_sort(msg, ar->schema);
1104
1105         /* we want to replace the old values */
1106         for (i=0; i < msg->num_elements; i++) {
1107                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
1108         }
1109
1110         if (DEBUGLVL(4)) {
1111                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1112                 DEBUG(4, ("DRS replication modify message:\n%s\n", s));
1113                 talloc_free(s);
1114         }
1115
1116         ret = ldb_build_mod_req(&change_req,
1117                                 ldb,
1118                                 ar,
1119                                 msg,
1120                                 ar->controls,
1121                                 ar,
1122                                 replmd_replicated_apply_merge_callback,
1123                                 ar->req);
1124         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1125
1126         return ldb_next_request(ar->module, change_req);
1127 }
1128
1129 static int replmd_replicated_apply_search_callback(struct ldb_request *req,
1130                                                    struct ldb_reply *ares)
1131 {
1132         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1133                                                struct replmd_replicated_request);
1134         int ret;
1135
1136         if (!ares) {
1137                 return ldb_module_done(ar->req, NULL, NULL,
1138                                         LDB_ERR_OPERATIONS_ERROR);
1139         }
1140         if (ares->error != LDB_SUCCESS &&
1141             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1142                 return ldb_module_done(ar->req, ares->controls,
1143                                         ares->response, ares->error);
1144         }
1145
1146         switch (ares->type) {
1147         case LDB_REPLY_ENTRY:
1148                 ar->search_msg = talloc_steal(ar, ares->message);
1149                 break;
1150
1151         case LDB_REPLY_REFERRAL:
1152                 /* we ignore referrals */
1153                 break;
1154
1155         case LDB_REPLY_DONE:
1156                 if (ar->search_msg != NULL) {
1157                         ret = replmd_replicated_apply_merge(ar);
1158                 } else {
1159                         ret = replmd_replicated_apply_add(ar);
1160                 }
1161                 if (ret != LDB_SUCCESS) {
1162                         return ldb_module_done(ar->req, NULL, NULL, ret);
1163                 }
1164         }
1165
1166         talloc_free(ares);
1167         return LDB_SUCCESS;
1168 }
1169
1170 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
1171
1172 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
1173 {
1174         struct ldb_context *ldb;
1175         int ret;
1176         char *tmp_str;
1177         char *filter;
1178         struct ldb_request *search_req;
1179
1180         if (ar->index_current >= ar->objs->num_objects) {
1181                 /* done with it, go to next stage */
1182                 return replmd_replicated_uptodate_vector(ar);
1183         }
1184
1185         ldb = ldb_module_get_ctx(ar->module);
1186         ar->search_msg = NULL;
1187
1188         tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].guid_value);
1189         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1190
1191         filter = talloc_asprintf(ar, "(objectGUID=%s)", tmp_str);
1192         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1193         talloc_free(tmp_str);
1194
1195         ret = ldb_build_search_req(&search_req,
1196                                    ldb,
1197                                    ar,
1198                                    ar->objs->partition_dn,
1199                                    LDB_SCOPE_SUBTREE,
1200                                    filter,
1201                                    NULL,
1202                                    NULL,
1203                                    ar,
1204                                    replmd_replicated_apply_search_callback,
1205                                    ar->req);
1206         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1207
1208         return ldb_next_request(ar->module, search_req);
1209 }
1210
1211 static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
1212                                                       struct ldb_reply *ares)
1213 {
1214         struct ldb_context *ldb;
1215         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1216                                                struct replmd_replicated_request);
1217         ldb = ldb_module_get_ctx(ar->module);
1218
1219         if (!ares) {
1220                 return ldb_module_done(ar->req, NULL, NULL,
1221                                         LDB_ERR_OPERATIONS_ERROR);
1222         }
1223         if (ares->error != LDB_SUCCESS) {
1224                 return ldb_module_done(ar->req, ares->controls,
1225                                         ares->response, ares->error);
1226         }
1227
1228         if (ares->type != LDB_REPLY_DONE) {
1229                 ldb_set_errstring(ldb, "Invalid reply type\n!");
1230                 return ldb_module_done(ar->req, NULL, NULL,
1231                                         LDB_ERR_OPERATIONS_ERROR);
1232         }
1233
1234         talloc_free(ares);
1235
1236         return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
1237 }
1238
1239 static int replmd_drsuapi_DsReplicaCursor2_compare(const struct drsuapi_DsReplicaCursor2 *c1,
1240                                                    const struct drsuapi_DsReplicaCursor2 *c2)
1241 {
1242         return GUID_compare(&c1->source_dsa_invocation_id, &c2->source_dsa_invocation_id);
1243 }
1244
1245 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1246 {
1247         struct ldb_context *ldb;
1248         struct ldb_request *change_req;
1249         enum ndr_err_code ndr_err;
1250         struct ldb_message *msg;
1251         struct replUpToDateVectorBlob ouv;
1252         const struct ldb_val *ouv_value;
1253         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1254         struct replUpToDateVectorBlob nuv;
1255         struct ldb_val nuv_value;
1256         struct ldb_message_element *nuv_el = NULL;
1257         const struct GUID *our_invocation_id;
1258         struct ldb_message_element *orf_el = NULL;
1259         struct repsFromToBlob nrf;
1260         struct ldb_val *nrf_value = NULL;
1261         struct ldb_message_element *nrf_el = NULL;
1262         uint32_t i,j,ni=0;
1263         bool found = false;
1264         time_t t = time(NULL);
1265         NTTIME now;
1266         int ret;
1267
1268         ldb = ldb_module_get_ctx(ar->module);
1269         ruv = ar->objs->uptodateness_vector;
1270         ZERO_STRUCT(ouv);
1271         ouv.version = 2;
1272         ZERO_STRUCT(nuv);
1273         nuv.version = 2;
1274
1275         unix_to_nt_time(&now, t);
1276
1277         /*
1278          * first create the new replUpToDateVector
1279          */
1280         ouv_value = ldb_msg_find_ldb_val(ar->search_msg, "replUpToDateVector");
1281         if (ouv_value) {
1282                 ndr_err = ndr_pull_struct_blob(ouv_value, ar,
1283                                                lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), &ouv,
1284                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1285                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1286                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1287                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1288                 }
1289
1290                 if (ouv.version != 2) {
1291                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1292                 }
1293         }
1294
1295         /*
1296          * the new uptodateness vector will at least
1297          * contain 1 entry, one for the source_dsa
1298          *
1299          * plus optional values from our old vector and the one from the source_dsa
1300          */
1301         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
1302         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1303         nuv.ctr.ctr2.cursors = talloc_array(ar,
1304                                             struct drsuapi_DsReplicaCursor2,
1305                                             nuv.ctr.ctr2.count);
1306         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1307
1308         /* first copy the old vector */
1309         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1310                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1311                 ni++;
1312         }
1313
1314         /* get our invocation_id if we have one already attached to the ldb */
1315         our_invocation_id = samdb_ntds_invocation_id(ldb);
1316
1317         /* merge in the source_dsa vector is available */
1318         for (i=0; (ruv && i < ruv->count); i++) {
1319                 found = false;
1320
1321                 if (our_invocation_id &&
1322                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1323                                our_invocation_id)) {
1324                         continue;
1325                 }
1326
1327                 for (j=0; j < ni; j++) {
1328                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1329                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1330                                 continue;
1331                         }
1332
1333                         found = true;
1334
1335                         /*
1336                          * we update only the highest_usn and not the latest_sync_success time,
1337                          * because the last success stands for direct replication
1338                          */
1339                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1340                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1341                         }
1342                         break;                  
1343                 }
1344
1345                 if (found) continue;
1346
1347                 /* if it's not there yet, add it */
1348                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1349                 ni++;
1350         }
1351
1352         /*
1353          * merge in the current highwatermark for the source_dsa
1354          */
1355         found = false;
1356         for (j=0; j < ni; j++) {
1357                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1358                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1359                         continue;
1360                 }
1361
1362                 found = true;
1363
1364                 /*
1365                  * here we update the highest_usn and last_sync_success time
1366                  * because we're directly replicating from the source_dsa
1367                  *
1368                  * and use the tmp_highest_usn because this is what we have just applied
1369                  * to our ldb
1370                  */
1371                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1372                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1373                 break;
1374         }
1375         if (!found) {
1376                 /*
1377                  * here we update the highest_usn and last_sync_success time
1378                  * because we're directly replicating from the source_dsa
1379                  *
1380                  * and use the tmp_highest_usn because this is what we have just applied
1381                  * to our ldb
1382                  */
1383                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1384                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1385                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1386                 ni++;
1387         }
1388
1389         /*
1390          * finally correct the size of the cursors array
1391          */
1392         nuv.ctr.ctr2.count = ni;
1393
1394         /*
1395          * sort the cursors
1396          */
1397         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1398               sizeof(struct drsuapi_DsReplicaCursor2),
1399               (comparison_fn_t)replmd_drsuapi_DsReplicaCursor2_compare);
1400
1401         /*
1402          * create the change ldb_message
1403          */
1404         msg = ldb_msg_new(ar);
1405         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1406         msg->dn = ar->search_msg->dn;
1407
1408         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
1409                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1410                                        &nuv,
1411                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1412         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1413                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1414                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1415         }
1416         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1417         if (ret != LDB_SUCCESS) {
1418                 return replmd_replicated_request_error(ar, ret);
1419         }
1420         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1421
1422         /*
1423          * now create the new repsFrom value from the given repsFromTo1 structure
1424          */
1425         ZERO_STRUCT(nrf);
1426         nrf.version                                     = 1;
1427         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1428         /* and fix some values... */
1429         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1430         nrf.ctr.ctr1.last_success                       = now;
1431         nrf.ctr.ctr1.last_attempt                       = now;
1432         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1433         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1434
1435         /*
1436          * first see if we already have a repsFrom value for the current source dsa
1437          * if so we'll later replace this value
1438          */
1439         orf_el = ldb_msg_find_element(ar->search_msg, "repsFrom");
1440         if (orf_el) {
1441                 for (i=0; i < orf_el->num_values; i++) {
1442                         struct repsFromToBlob *trf;
1443
1444                         trf = talloc(ar, struct repsFromToBlob);
1445                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1446
1447                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), trf,
1448                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1449                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1450                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1451                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1452                         }
1453
1454                         if (trf->version != 1) {
1455                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1456                         }
1457
1458                         /*
1459                          * we compare the source dsa objectGUID not the invocation_id
1460                          * because we want only one repsFrom value per source dsa
1461                          * and when the invocation_id of the source dsa has changed we don't need 
1462                          * the old repsFrom with the old invocation_id
1463                          */
1464                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1465                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1466                                 talloc_free(trf);
1467                                 continue;
1468                         }
1469
1470                         talloc_free(trf);
1471                         nrf_value = &orf_el->values[i];
1472                         break;
1473                 }
1474
1475                 /*
1476                  * copy over all old values to the new ldb_message
1477                  */
1478                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1479                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1480                 *nrf_el = *orf_el;
1481         }
1482
1483         /*
1484          * if we haven't found an old repsFrom value for the current source dsa
1485          * we'll add a new value
1486          */
1487         if (!nrf_value) {
1488                 struct ldb_val zero_value;
1489                 ZERO_STRUCT(zero_value);
1490                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1491                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1492
1493                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1494         }
1495
1496         /* we now fill the value which is already attached to ldb_message */
1497         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
1498                                        lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
1499                                        &nrf,
1500                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1501         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1502                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1503                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1504         }
1505
1506         /* 
1507          * the ldb_message_element for the attribute, has all the old values and the new one
1508          * so we'll replace the whole attribute with all values
1509          */
1510         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1511
1512         if (DEBUGLVL(4)) {
1513                 char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
1514                 DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
1515                 talloc_free(s);
1516         }
1517
1518         /* prepare the ldb_modify() request */
1519         ret = ldb_build_mod_req(&change_req,
1520                                 ldb,
1521                                 ar,
1522                                 msg,
1523                                 ar->controls,
1524                                 ar,
1525                                 replmd_replicated_uptodate_modify_callback,
1526                                 ar->req);
1527         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1528
1529         return ldb_next_request(ar->module, change_req);
1530 }
1531
1532 static int replmd_replicated_uptodate_search_callback(struct ldb_request *req,
1533                                                       struct ldb_reply *ares)
1534 {
1535         struct replmd_replicated_request *ar = talloc_get_type(req->context,
1536                                                struct replmd_replicated_request);
1537         int ret;
1538
1539         if (!ares) {
1540                 return ldb_module_done(ar->req, NULL, NULL,
1541                                         LDB_ERR_OPERATIONS_ERROR);
1542         }
1543         if (ares->error != LDB_SUCCESS &&
1544             ares->error != LDB_ERR_NO_SUCH_OBJECT) {
1545                 return ldb_module_done(ar->req, ares->controls,
1546                                         ares->response, ares->error);
1547         }
1548
1549         switch (ares->type) {
1550         case LDB_REPLY_ENTRY:
1551                 ar->search_msg = talloc_steal(ar, ares->message);
1552                 break;
1553
1554         case LDB_REPLY_REFERRAL:
1555                 /* we ignore referrals */
1556                 break;
1557
1558         case LDB_REPLY_DONE:
1559                 if (ar->search_msg == NULL) {
1560                         ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1561                 } else {
1562                         ret = replmd_replicated_uptodate_modify(ar);
1563                 }
1564                 if (ret != LDB_SUCCESS) {
1565                         return ldb_module_done(ar->req, NULL, NULL, ret);
1566                 }
1567         }
1568
1569         talloc_free(ares);
1570         return LDB_SUCCESS;
1571 }
1572
1573
1574 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1575 {
1576         struct ldb_context *ldb;
1577         int ret;
1578         static const char *attrs[] = {
1579                 "replUpToDateVector",
1580                 "repsFrom",
1581                 NULL
1582         };
1583         struct ldb_request *search_req;
1584
1585         ldb = ldb_module_get_ctx(ar->module);
1586         ar->search_msg = NULL;
1587
1588         ret = ldb_build_search_req(&search_req,
1589                                    ldb,
1590                                    ar,
1591                                    ar->objs->partition_dn,
1592                                    LDB_SCOPE_BASE,
1593                                    "(objectClass=*)",
1594                                    attrs,
1595                                    NULL,
1596                                    ar,
1597                                    replmd_replicated_uptodate_search_callback,
1598                                    ar->req);
1599         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1600
1601         return ldb_next_request(ar->module, search_req);
1602 }
1603
1604
1605
1606 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1607 {
1608         struct ldb_context *ldb;
1609         struct dsdb_extended_replicated_objects *objs;
1610         struct replmd_replicated_request *ar;
1611         struct ldb_control **ctrls;
1612         int ret, i;
1613         struct dsdb_control_current_partition *partition_ctrl;
1614         struct replmd_private *replmd_private = 
1615                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1616
1617         ldb = ldb_module_get_ctx(module);
1618
1619         ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1620
1621         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1622         if (!objs) {
1623                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1624                 return LDB_ERR_PROTOCOL_ERROR;
1625         }
1626
1627         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1628                 ldb_debug(ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1629                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1630                 return LDB_ERR_PROTOCOL_ERROR;
1631         }
1632
1633         ar = replmd_ctx_init(module, req);
1634         if (!ar)
1635                 return LDB_ERR_OPERATIONS_ERROR;
1636
1637         ar->objs = objs;
1638         ar->schema = dsdb_get_schema(ldb);
1639         if (!ar->schema) {
1640                 ldb_debug_set(ldb, LDB_DEBUG_FATAL, "replmd_ctx_init: no loaded schema found\n");
1641                 talloc_free(ar);
1642                 return LDB_ERR_CONSTRAINT_VIOLATION;
1643         }
1644
1645         ctrls = req->controls;
1646
1647         if (req->controls) {
1648                 req->controls = talloc_memdup(ar, req->controls,
1649                                               talloc_get_size(req->controls));
1650                 if (!req->controls) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1651         }
1652
1653         ret = ldb_request_add_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID, false, NULL);
1654         if (ret != LDB_SUCCESS) {
1655                 return ret;
1656         }
1657
1658         /*
1659           add the DSDB_CONTROL_CURRENT_PARTITION_OID control. This
1660           tells the partition module which partition this request is
1661           directed at. That is important as the partition roots appear
1662           twice in the directory, once as mount points in the top
1663           level store, and once as the roots of each partition. The
1664           replication code wants to operate on the root of the
1665           partitions, not the top level mount points
1666          */
1667         partition_ctrl = talloc(req, struct dsdb_control_current_partition);
1668         if (partition_ctrl == NULL) {
1669                 if (!partition_ctrl) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1670         }
1671         partition_ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
1672         partition_ctrl->dn = objs->partition_dn;
1673
1674         ret = ldb_request_add_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID, false, partition_ctrl);
1675         if (ret != LDB_SUCCESS) {
1676                 return ret;
1677         }
1678
1679         ar->controls = req->controls;
1680         req->controls = ctrls;
1681
1682         DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
1683
1684         /* save away the linked attributes for the end of the
1685            transaction */
1686         for (i=0; i<ar->objs->linked_attributes_count; i++) {
1687                 struct la_entry *la_entry;
1688
1689                 if (replmd_private == NULL) {
1690                         DEBUG(0,(__location__ ": repl_meta_data not called from within a transaction\n"));
1691                         return LDB_ERR_OPERATIONS_ERROR;
1692                 }
1693
1694                 la_entry = talloc(replmd_private, struct la_entry);
1695                 if (la_entry == NULL) {
1696                         ldb_oom(ldb);
1697                         return LDB_ERR_OPERATIONS_ERROR;
1698                 }
1699                 la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
1700                 if (la_entry->la == NULL) {
1701                         ldb_oom(ldb);
1702                         return LDB_ERR_OPERATIONS_ERROR;
1703                 }
1704                 *la_entry->la = ar->objs->linked_attributes[i];
1705
1706                 /* we need to steal the non-scalars so they stay
1707                    around until the end of the transaction */
1708                 talloc_steal(la_entry->la, la_entry->la->identifier);
1709                 talloc_steal(la_entry->la, la_entry->la->value.blob);
1710
1711                 DLIST_ADD(replmd_private->la_list, la_entry);
1712         }
1713
1714         return replmd_replicated_apply_next(ar);
1715 }
1716
1717 /*
1718   process one linked attribute structure
1719  */
1720 static int replmd_process_linked_attribute(struct ldb_module *module,
1721                                            struct la_entry *la_entry)
1722 {                                          
1723         struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
1724         struct ldb_context *ldb = ldb_module_get_ctx(module);
1725         struct drsuapi_DsReplicaObjectIdentifier3 target;
1726         struct ldb_message *msg;
1727         struct ldb_message_element *ret_el;
1728         TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
1729         enum ndr_err_code ndr_err;
1730         char *target_dn;
1731         struct ldb_request *mod_req;
1732         int ret;
1733         const struct dsdb_attribute *attr;
1734
1735 /*
1736 linked_attributes[0]:                                                     
1737      &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute 
1738         identifier               : *                                      
1739             identifier: struct drsuapi_DsReplicaObjectIdentifier          
1740                 __ndr_size               : 0x0000003a (58)                
1741                 __ndr_size_sid           : 0x00000000 (0)                 
1742                 guid                     : 8e95b6a9-13dd-4158-89db-3220a5be5cc7
1743                 sid                      : S-0-0                               
1744                 __ndr_size_dn            : 0x00000000 (0)                      
1745                 dn                       : ''                                  
1746         attid                    : DRSUAPI_ATTRIBUTE_member (0x1F)             
1747         value: struct drsuapi_DsAttributeValue                                 
1748             __ndr_size               : 0x0000007e (126)                        
1749             blob                     : *                                       
1750                 blob                     : DATA_BLOB length=126                
1751         flags                    : 0x00000001 (1)                              
1752                1: DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE                      
1753         originating_add_time     : Wed Sep  2 22:20:01 2009 EST                
1754         meta_data: struct drsuapi_DsReplicaMetaData                            
1755             version                  : 0x00000015 (21)                         
1756             originating_change_time  : Wed Sep  2 23:39:07 2009 EST            
1757             originating_invocation_id: 794640f3-18cf-40ee-a211-a93992b67a64    
1758             originating_usn          : 0x000000000001e19c (123292)             
1759      &target: struct drsuapi_DsReplicaObjectIdentifier3                        
1760         __ndr_size               : 0x0000007e (126)                            
1761         __ndr_size_sid           : 0x0000001c (28)                             
1762         guid                     : 7639e594-db75-4086-b0d4-67890ae46031        
1763         sid                      : S-1-5-21-2848215498-2472035911-1947525656-19924
1764         __ndr_size_dn            : 0x00000022 (34)                                
1765         dn                       : 'CN=UOne,OU=TestOU,DC=vsofs8,DC=com'           
1766  */
1767         if (DEBUGLVL(4)) {
1768                 NDR_PRINT_DEBUG(drsuapi_DsReplicaLinkedAttribute, la); 
1769         }
1770         
1771         /* decode the target of the link */
1772         ndr_err = ndr_pull_struct_blob(la->value.blob, 
1773                                        tmp_ctx, lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")), 
1774                                        &target,
1775                                        (ndr_pull_flags_fn_t)ndr_pull_drsuapi_DsReplicaObjectIdentifier3);
1776         if (ndr_err != NDR_ERR_SUCCESS) {
1777                 DEBUG(0,("Unable to decode linked_attribute target\n"));
1778                 dump_data(4, la->value.blob->data, la->value.blob->length);                     
1779                 talloc_free(tmp_ctx);
1780                 return LDB_ERR_OPERATIONS_ERROR;
1781         }
1782         if (DEBUGLVL(4)) {
1783                 NDR_PRINT_DEBUG(drsuapi_DsReplicaObjectIdentifier3, &target);
1784         }
1785
1786         /* construct a modify request for this attribute change */
1787         msg = ldb_msg_new(tmp_ctx);
1788         if (!msg) {
1789                 ldb_oom(ldb);
1790                 talloc_free(tmp_ctx);
1791                 return LDB_ERR_OPERATIONS_ERROR;
1792         }
1793
1794         ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, 
1795                                    GUID_string(tmp_ctx, &la->identifier->guid), &msg->dn);
1796         if (ret != LDB_SUCCESS) {
1797                 talloc_free(tmp_ctx);
1798                 return ret;
1799         }
1800
1801         /* find the attribute being modified */
1802         attr = dsdb_attribute_by_attributeID_id(dsdb_get_schema(ldb), la->attid);
1803         if (attr == NULL) {
1804                 DEBUG(0, (__location__ ": Unable to find attributeID 0x%x\n", la->attid));
1805                 talloc_free(tmp_ctx);
1806                 return LDB_ERR_OPERATIONS_ERROR;
1807         }
1808
1809         if (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) {
1810                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
1811                                         LDB_FLAG_MOD_ADD, &ret_el);
1812         } else {
1813                 ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName,
1814                                         LDB_FLAG_MOD_DELETE, &ret_el);
1815         }
1816         if (ret != LDB_SUCCESS) {
1817                 talloc_free(tmp_ctx);
1818                 return ret;
1819         }
1820         ret_el->values = talloc_array(msg, struct ldb_val, 1);
1821         if (!ret_el->values) {
1822                 ldb_oom(ldb);
1823                 talloc_free(tmp_ctx);
1824                 return LDB_ERR_OPERATIONS_ERROR;
1825         }
1826         ret_el->num_values = 1;
1827
1828         target_dn = talloc_asprintf(tmp_ctx, "<GUID=%s>;<SID=%s>;%s",
1829                                     GUID_string(tmp_ctx, &target.guid),
1830                                     dom_sid_string(tmp_ctx, &target.sid),
1831                                     target.dn);
1832         if (target_dn == NULL) {
1833                 ldb_oom(ldb);
1834                 talloc_free(tmp_ctx);
1835                 return LDB_ERR_OPERATIONS_ERROR;
1836         }
1837         ret_el->values[0] = data_blob_string_const(target_dn);
1838
1839         ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
1840                                 msg,
1841                                 NULL,
1842                                 NULL, 
1843                                 ldb_op_default_callback,
1844                                 NULL);
1845         if (ret != LDB_SUCCESS) {
1846                 talloc_free(tmp_ctx);
1847                 return ret;
1848         }
1849         talloc_steal(mod_req, msg);
1850
1851         if (DEBUGLVL(4)) {
1852                 DEBUG(4,("Applying DRS linked attribute change:\n%s\n",
1853                          ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg)));
1854         }
1855
1856         /* Run the new request */
1857         ret = ldb_next_request(module, mod_req);
1858
1859         /* we need to wait for this to finish, as we are being called
1860            from the synchronous end_transaction hook of this module */
1861         if (ret == LDB_SUCCESS) {
1862                 ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
1863         }
1864
1865         if (ret != LDB_SUCCESS) {
1866                 ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s' %s\n",
1867                           ldb_errstring(ldb),
1868                           ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
1869         }
1870         
1871         talloc_free(tmp_ctx);
1872
1873         return ret;     
1874 }
1875
1876 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
1877 {
1878         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
1879                 return replmd_extended_replicated_objects(module, req);
1880         }
1881
1882         return ldb_next_request(module, req);
1883 }
1884
1885
1886 /*
1887   we hook into the transaction operations to allow us to 
1888   perform the linked attribute updates at the end of the whole
1889   transaction. This allows a forward linked attribute to be created
1890   before the object is created. During a vampire, w2k8 sends us linked
1891   attributes before the objects they are part of.
1892  */
1893 static int replmd_start_transaction(struct ldb_module *module)
1894 {
1895         /* create our private structure for this transaction */
1896         struct replmd_private *replmd_private = talloc_get_type(ldb_module_get_private(module),
1897                                                                 struct replmd_private);
1898         talloc_free(replmd_private);
1899         replmd_private = talloc(module, struct replmd_private);
1900         if (replmd_private == NULL) {
1901                 return LDB_ERR_OPERATIONS_ERROR;
1902         }
1903         replmd_private->la_list = NULL;
1904         ldb_module_set_private(module, replmd_private);
1905         return ldb_next_start_trans(module);
1906 }
1907
1908 /*
1909   on prepare commit we loop over our queued la_context structures and
1910   apply each of them  
1911  */
1912 static int replmd_prepare_commit(struct ldb_module *module)
1913 {
1914         struct replmd_private *replmd_private = 
1915                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1916         struct la_entry *la;
1917
1918         /* walk the list backwards, to do the first entry first, as we
1919          * added the entries with DLIST_ADD() which puts them at the
1920          * start of the list */
1921         for (la = replmd_private->la_list; la && la->next; la=la->next) ;
1922
1923         for (; la; la=la->prev) {
1924                 int ret;
1925                 ret = replmd_process_linked_attribute(module, la);
1926                 if (ret != LDB_SUCCESS) {
1927                         return ret;
1928                 }
1929         }
1930
1931         talloc_free(replmd_private);
1932         ldb_module_set_private(module, NULL);
1933         
1934         return ldb_next_prepare_commit(module);
1935 }
1936
1937 static int replmd_del_transaction(struct ldb_module *module)
1938 {
1939         struct replmd_private *replmd_private = 
1940                 talloc_get_type(ldb_module_get_private(module), struct replmd_private);
1941         talloc_free(replmd_private);
1942         ldb_module_set_private(module, NULL);
1943         return ldb_next_del_trans(module);
1944 }
1945
1946
1947 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
1948         .name          = "repl_meta_data",
1949         .add           = replmd_add,
1950         .modify        = replmd_modify,
1951         .extended      = replmd_extended,
1952         .start_transaction = replmd_start_transaction,
1953         .prepare_commit    = replmd_prepare_commit,
1954         .del_transaction   = replmd_del_transaction,
1955 };