Fix DRSUAPI replication test - NET-API-BECOME-DC.
[ira/wip.git] / source / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2006
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12    
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb
29  *
30  *  Component: ldb repl_meta_data module
31  *
32  *  Description: - add a unique objectGUID onto every new record,
33  *               - handle whenCreated, whenChanged timestamps
34  *               - handle uSNCreated, uSNChanged numbers
35  *               - handle replPropertyMetaData attribute
36  *
37  *  Author: Simo Sorce
38  *  Author: Stefan Metzmacher
39  */
40
41 #include "includes.h"
42 #include "lib/ldb/include/ldb.h"
43 #include "lib/ldb/include/ldb_errors.h"
44 #include "lib/ldb/include/ldb_private.h"
45 #include "dsdb/samdb/samdb.h"
46 #include "dsdb/common/flags.h"
47 #include "librpc/gen_ndr/ndr_misc.h"
48 #include "librpc/gen_ndr/ndr_drsuapi.h"
49 #include "librpc/gen_ndr/ndr_drsblobs.h"
50 #include "param/param.h"
51
52 struct replmd_replicated_request {
53         struct ldb_module *module;
54         struct ldb_handle *handle;
55         struct ldb_request *orig_req;
56
57         const struct dsdb_schema *schema;
58
59         struct dsdb_extended_replicated_objects *objs;
60
61         uint32_t index_current;
62
63         struct {
64                 TALLOC_CTX *mem_ctx;
65                 struct ldb_request *search_req;
66                 struct ldb_message *search_msg;
67                 int search_ret;
68                 struct ldb_request *change_req;
69                 int change_ret;
70         } sub;
71 };
72
73 static struct replmd_replicated_request *replmd_replicated_init_handle(struct ldb_module *module,
74                                                                        struct ldb_request *req,
75                                                                        struct dsdb_extended_replicated_objects *objs)
76 {
77         struct replmd_replicated_request *ar;
78         struct ldb_handle *h;
79         const struct dsdb_schema *schema;
80
81         schema = dsdb_get_schema(module->ldb);
82         if (!schema) {
83                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
84                               "replmd_replicated_init_handle: no loaded schema found\n");
85                 return NULL;
86         }
87
88         h = talloc_zero(req, struct ldb_handle);
89         if (h == NULL) {
90                 ldb_set_errstring(module->ldb, "Out of Memory");
91                 return NULL;
92         }
93
94         h->module       = module;
95         h->state        = LDB_ASYNC_PENDING;
96         h->status       = LDB_SUCCESS;
97
98         ar = talloc_zero(h, struct replmd_replicated_request);
99         if (ar == NULL) {
100                 ldb_set_errstring(module->ldb, "Out of Memory");
101                 talloc_free(h);
102                 return NULL;
103         }
104
105         h->private_data = ar;
106
107         ar->module      = module;
108         ar->handle      = h;
109         ar->orig_req    = req;
110         ar->schema      = schema;
111         ar->objs        = objs;
112
113         req->handle = h;
114
115         return ar;
116 }
117
118 /*
119   add a time element to a record
120 */
121 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
122 {
123         struct ldb_message_element *el;
124         char *s;
125
126         if (ldb_msg_find_element(msg, attr) != NULL) {
127                 return 0;
128         }
129
130         s = ldb_timestring(msg, t);
131         if (s == NULL) {
132                 return -1;
133         }
134
135         if (ldb_msg_add_string(msg, attr, s) != 0) {
136                 return -1;
137         }
138
139         el = ldb_msg_find_element(msg, attr);
140         /* always set as replace. This works because on add ops, the flag
141            is ignored */
142         el->flags = LDB_FLAG_MOD_REPLACE;
143
144         return 0;
145 }
146
147 /*
148   add a uint64_t element to a record
149 */
150 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
151 {
152         struct ldb_message_element *el;
153
154         if (ldb_msg_find_element(msg, attr) != NULL) {
155                 return 0;
156         }
157
158         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != 0) {
159                 return -1;
160         }
161
162         el = ldb_msg_find_element(msg, attr);
163         /* always set as replace. This works because on add ops, the flag
164            is ignored */
165         el->flags = LDB_FLAG_MOD_REPLACE;
166
167         return 0;
168 }
169
170 static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMetaData1 *m1,
171                                                    const struct replPropertyMetaData1 *m2,
172                                                    const uint32_t *rdn_attid)
173 {
174         if (m1->attid == m2->attid) {
175                 return 0;
176         }
177
178         /*
179          * the rdn attribute should be at the end!
180          * so we need to return a value greater than zero
181          * which means m1 is greater than m2
182          */
183         if (m1->attid == *rdn_attid) {
184                 return 1;
185         }
186
187         /*
188          * the rdn attribute should be at the end!
189          * so we need to return a value less than zero
190          * which means m2 is greater than m1
191          */
192         if (m2->attid == *rdn_attid) {
193                 return -1;
194         }
195
196         return m1->attid - m2->attid;
197 }
198
199 static void replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
200                                                  const uint32_t *rdn_attid)
201 {
202         ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
203                   discard_const_p(void, rdn_attid), (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
204 }
205
206 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
207                                                  const struct ldb_message_element *e2,
208                                                  const struct dsdb_schema *schema)
209 {
210         const struct dsdb_attribute *a1;
211         const struct dsdb_attribute *a2;
212
213         /* 
214          * TODO: make this faster by caching the dsdb_attribute pointer
215          *       on the ldb_messag_element
216          */
217
218         a1 = dsdb_attribute_by_lDAPDisplayName(schema, e1->name);
219         a2 = dsdb_attribute_by_lDAPDisplayName(schema, e2->name);
220
221         /*
222          * TODO: remove this check, we should rely on e1 and e2 having valid attribute names
223          *       in the schema
224          */
225         if (!a1 || !a2) {
226                 return strcasecmp(e1->name, e2->name);
227         }
228
229         return a1->attributeID_id - a2->attributeID_id;
230 }
231
232 static void replmd_ldb_message_sort(struct ldb_message *msg,
233                                     const struct dsdb_schema *schema)
234 {
235         ldb_qsort(msg->elements, msg->num_elements, sizeof(struct ldb_message_element),
236                   discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
237 }
238
239 static int replmd_prepare_originating(struct ldb_module *module, struct ldb_request *req,
240                                       struct ldb_dn *dn, const char *fn_name,
241                                       int (*fn)(struct ldb_module *,
242                                                 struct ldb_request *,
243                                                 const struct dsdb_schema *))
244 {
245         const struct dsdb_schema *schema;
246  
247         /* do not manipulate our control entries */
248         if (ldb_dn_is_special(dn)) {
249                 return ldb_next_request(module, req);
250         }
251
252         schema = dsdb_get_schema(module->ldb);
253         if (!schema) {
254                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
255                               "%s: no dsdb_schema loaded",
256                               fn_name);
257                 return LDB_ERR_CONSTRAINT_VIOLATION;
258         }
259
260         return fn(module, req, schema);
261 }
262
263 static int replmd_add_originating(struct ldb_module *module,
264                                   struct ldb_request *req,
265                                   const struct dsdb_schema *schema)
266 {
267         enum ndr_err_code ndr_err;
268         struct ldb_request *down_req;
269         struct ldb_message *msg;
270         const struct dsdb_attribute *rdn_attr = NULL;
271         struct GUID guid;
272         struct ldb_val guid_value;
273         struct replPropertyMetaDataBlob nmd;
274         struct ldb_val nmd_value;
275         uint64_t seq_num;
276         const struct GUID *our_invocation_id;
277         time_t t = time(NULL);
278         NTTIME now;
279         char *time_str;
280         int ret;
281         uint32_t i, ni=0;
282
283         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_add_originating\n");
284
285         if (ldb_msg_find_element(req->op.add.message, "objectGUID")) {
286                 ldb_debug_set(module->ldb, LDB_DEBUG_ERROR,
287                               "replmd_add_originating: it's not allowed to add an object with objectGUID\n");
288                 return LDB_ERR_UNWILLING_TO_PERFORM;
289         }
290
291         /* Get a sequence number from the backend */
292         ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
293         if (ret != LDB_SUCCESS) {
294                 return ret;
295         }
296
297         /* a new GUID */
298         guid = GUID_random();
299
300         /* get our invicationId */
301         our_invocation_id = samdb_ntds_invocation_id(module->ldb);
302         if (!our_invocation_id) {
303                 ldb_debug_set(module->ldb, LDB_DEBUG_ERROR,
304                               "replmd_add_originating: unable to find invocationId\n");
305                 return LDB_ERR_OPERATIONS_ERROR;
306         }
307
308         /* create a copy of the request */
309         down_req = talloc(req, struct ldb_request);
310         if (down_req == NULL) {
311                 ldb_oom(module->ldb);
312                 return LDB_ERR_OPERATIONS_ERROR;
313         }
314         *down_req = *req;
315
316         /* we have to copy the message as the caller might have it as a const */
317         down_req->op.add.message = msg = ldb_msg_copy_shallow(down_req, req->op.add.message);
318         if (msg == NULL) {
319                 talloc_free(down_req);
320                 ldb_oom(module->ldb);
321                 return LDB_ERR_OPERATIONS_ERROR;
322         }
323
324         /* generated times */
325         unix_to_nt_time(&now, t);
326         time_str = ldb_timestring(msg, t);
327         if (!time_str) {
328                 talloc_free(down_req);
329                 return LDB_ERR_OPERATIONS_ERROR;
330         }
331
332         /* 
333          * remove autogenerated attributes
334          */
335         ldb_msg_remove_attr(msg, "whenCreated");
336         ldb_msg_remove_attr(msg, "whenChanged");
337         ldb_msg_remove_attr(msg, "uSNCreated");
338         ldb_msg_remove_attr(msg, "uSNChanged");
339         ldb_msg_remove_attr(msg, "replPropertyMetaData");
340
341         /*
342          * readd replicated attributes
343          */
344         ret = ldb_msg_add_string(msg, "whenCreated", time_str);
345         if (ret != LDB_SUCCESS) {
346                 talloc_free(down_req);
347                 ldb_oom(module->ldb);
348                 return LDB_ERR_OPERATIONS_ERROR;
349         }
350
351         /* build the replication meta_data */
352         ZERO_STRUCT(nmd);
353         nmd.version             = 1;
354         nmd.ctr.ctr1.count      = msg->num_elements;
355         nmd.ctr.ctr1.array      = talloc_array(msg,
356                                                struct replPropertyMetaData1,
357                                                nmd.ctr.ctr1.count);
358         if (!nmd.ctr.ctr1.array) {
359                 talloc_free(down_req);
360                 ldb_oom(module->ldb);
361                 return LDB_ERR_OPERATIONS_ERROR;
362         }
363
364         for (i=0; i < msg->num_elements; i++) {
365                 struct ldb_message_element *e = &msg->elements[i];
366                 struct replPropertyMetaData1 *m = &nmd.ctr.ctr1.array[ni];
367                 const struct dsdb_attribute *sa;
368
369                 if (e->name[0] == '@') continue;
370
371                 sa = dsdb_attribute_by_lDAPDisplayName(schema, e->name);
372                 if (!sa) {
373                         ldb_debug_set(module->ldb, LDB_DEBUG_ERROR,
374                                       "replmd_add_originating: attribute '%s' not defined in schema\n",
375                                       e->name);
376                         talloc_free(down_req);
377                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
378                 }
379
380                 if ((sa->systemFlags & 0x00000001) || (sa->systemFlags & 0x00000004)) {
381                         /* if the attribute is not replicated (0x00000001)
382                          * or constructed (0x00000004) it has no metadata
383                          */
384                         continue;
385                 }
386
387                 m->attid                        = sa->attributeID_id;
388                 m->version                      = 1;
389                 m->originating_change_time      = now;
390                 m->originating_invocation_id    = *our_invocation_id;
391                 m->originating_usn              = seq_num;
392                 m->local_usn                    = seq_num;
393                 ni++;
394         }
395
396         /* fix meta data count */
397         nmd.ctr.ctr1.count = ni;
398
399         /*
400          * sort meta data array, and move the rdn attribute entry to the end
401          */
402         replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_attr->attributeID_id);
403
404         /* generated NDR encoded values */
405         ndr_err = ndr_push_struct_blob(&guid_value, msg, 
406                                        NULL,
407                                        &guid,
408                                        (ndr_push_flags_fn_t)ndr_push_GUID);
409         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
410                 ldb_oom(module->ldb);
411                 return LDB_ERR_OPERATIONS_ERROR;
412         }
413         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
414                                        lp_iconv_convenience(ldb_get_opaque(module->ldb, "loadparm")),
415                                        &nmd,
416                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
417         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
418                 talloc_free(down_req);
419                 ldb_oom(module->ldb);
420                 return LDB_ERR_OPERATIONS_ERROR;
421         }
422
423         /*
424          * add the autogenerated values
425          */
426         ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
427         if (ret != LDB_SUCCESS) {
428                 talloc_free(down_req);
429                 ldb_oom(module->ldb);
430                 return LDB_ERR_OPERATIONS_ERROR;
431         }
432         ret = ldb_msg_add_string(msg, "whenChanged", time_str);
433         if (ret != LDB_SUCCESS) {
434                 talloc_free(down_req);
435                 ldb_oom(module->ldb);
436                 return LDB_ERR_OPERATIONS_ERROR;
437         }
438         ret = samdb_msg_add_uint64(module->ldb, msg, msg, "uSNCreated", seq_num);
439         if (ret != LDB_SUCCESS) {
440                 talloc_free(down_req);
441                 ldb_oom(module->ldb);
442                 return LDB_ERR_OPERATIONS_ERROR;
443         }
444         ret = samdb_msg_add_uint64(module->ldb, msg, msg, "uSNChanged", seq_num);
445         if (ret != LDB_SUCCESS) {
446                 talloc_free(down_req);
447                 ldb_oom(module->ldb);
448                 return LDB_ERR_OPERATIONS_ERROR;
449         }
450         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
451         if (ret != LDB_SUCCESS) {
452                 talloc_free(down_req);
453                 ldb_oom(module->ldb);
454                 return LDB_ERR_OPERATIONS_ERROR;
455         }
456
457         /*
458          * sort the attributes by attid before storing the object
459          */
460         replmd_ldb_message_sort(msg, schema);
461
462         ldb_set_timeout_from_prev_req(module->ldb, req, down_req);
463
464         /* go on with the call chain */
465         ret = ldb_next_request(module, down_req);
466
467         /* do not free down_req as the call results may be linked to it,
468          * it will be freed when the upper level request get freed */
469         if (ret == LDB_SUCCESS) {
470                 req->handle = down_req->handle;
471         }
472
473         return ret;
474 }
475
476 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
477 {
478         return replmd_prepare_originating(module, req, req->op.add.message->dn,
479                                           "replmd_add", replmd_add_originating);
480 }
481
482 static int replmd_modify_originating(struct ldb_module *module,
483                                      struct ldb_request *req,
484                                      const struct dsdb_schema *schema)
485 {
486         struct ldb_request *down_req;
487         struct ldb_message *msg;
488         int ret;
489         time_t t = time(NULL);
490         uint64_t seq_num;
491
492         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_modify_originating\n");
493
494         down_req = talloc(req, struct ldb_request);
495         if (down_req == NULL) {
496                 return LDB_ERR_OPERATIONS_ERROR;
497         }
498
499         *down_req = *req;
500
501         /* we have to copy the message as the caller might have it as a const */
502         down_req->op.mod.message = msg = ldb_msg_copy_shallow(down_req, req->op.mod.message);
503         if (msg == NULL) {
504                 talloc_free(down_req);
505                 return LDB_ERR_OPERATIONS_ERROR;
506         }
507
508         if (add_time_element(msg, "whenChanged", t) != 0) {
509                 talloc_free(down_req);
510                 return LDB_ERR_OPERATIONS_ERROR;
511         }
512
513         /* Get a sequence number from the backend */
514         ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
515         if (ret == LDB_SUCCESS) {
516                 if (add_uint64_element(msg, "uSNChanged", seq_num) != 0) {
517                         talloc_free(down_req);
518                         return LDB_ERR_OPERATIONS_ERROR;
519                 }
520         }
521
522         ldb_set_timeout_from_prev_req(module->ldb, req, down_req);
523
524         /* go on with the call chain */
525         ret = ldb_next_request(module, down_req);
526
527         /* do not free down_req as the call results may be linked to it,
528          * it will be freed when the upper level request get freed */
529         if (ret == LDB_SUCCESS) {
530                 req->handle = down_req->handle;
531         }
532
533         return ret;
534 }
535
536 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
537 {
538         return replmd_prepare_originating(module, req, req->op.mod.message->dn,
539                                           "replmd_modify", replmd_modify_originating);
540 }
541
542 static int replmd_replicated_request_reply_helper(struct replmd_replicated_request *ar, int ret)
543 {
544         struct ldb_reply *ares = NULL;
545
546         ar->handle->status = ret;
547         ar->handle->state = LDB_ASYNC_DONE;
548
549         if (!ar->orig_req->callback) {
550                 return LDB_SUCCESS;
551         }
552         
553         /* we're done and need to report the success to the caller */
554         ares = talloc_zero(ar, struct ldb_reply);
555         if (!ares) {
556                 ar->handle->status = LDB_ERR_OPERATIONS_ERROR;
557                 ar->handle->state = LDB_ASYNC_DONE;
558                 return LDB_ERR_OPERATIONS_ERROR;
559         }
560
561         ares->type      = LDB_REPLY_EXTENDED;
562         ares->response  = NULL;
563
564         return ar->orig_req->callback(ar->module->ldb, ar->orig_req->context, ares);
565 }
566
567 static int replmd_replicated_request_done(struct replmd_replicated_request *ar)
568 {
569         return replmd_replicated_request_reply_helper(ar, LDB_SUCCESS);
570 }
571
572 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
573 {
574         return replmd_replicated_request_reply_helper(ar, ret);
575 }
576
577 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
578 {
579         int ret = LDB_ERR_OTHER;
580         /* TODO: do some error mapping */
581         return replmd_replicated_request_reply_helper(ar, ret);
582 }
583
584 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
585
586 static int replmd_replicated_apply_add_callback(struct ldb_context *ldb,
587                                                 void *private_data,
588                                                 struct ldb_reply *ares)
589 {
590 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
591         struct replmd_replicated_request *ar = talloc_get_type(private_data,
592                                                struct replmd_replicated_request);
593
594         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
595         if (ar->sub.change_ret != LDB_SUCCESS) {
596                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
597         }
598
599         talloc_free(ar->sub.mem_ctx);
600         ZERO_STRUCT(ar->sub);
601
602         ar->index_current++;
603
604         return replmd_replicated_apply_next(ar);
605 #else
606         return LDB_SUCCESS;
607 #endif
608 }
609
610 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
611 {
612         enum ndr_err_code ndr_err;
613         struct ldb_message *msg;
614         struct replPropertyMetaDataBlob *md;
615         struct ldb_val md_value;
616         uint32_t i;
617         uint64_t seq_num;
618         int ret;
619
620         /*
621          * TODO: check if the parent object exist
622          */
623
624         /*
625          * TODO: handle the conflict case where an object with the
626          *       same name exist
627          */
628
629         msg = ar->objs->objects[ar->index_current].msg;
630         md = ar->objs->objects[ar->index_current].meta_data;
631
632         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
633         if (ret != LDB_SUCCESS) {
634                 return replmd_replicated_request_error(ar, ret);
635         }
636
637         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
638         if (ret != LDB_SUCCESS) {
639                 return replmd_replicated_request_error(ar, ret);
640         }
641
642         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
643         if (ret != LDB_SUCCESS) {
644                 return replmd_replicated_request_error(ar, ret);
645         }
646
647         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNCreated", seq_num);
648         if (ret != LDB_SUCCESS) {
649                 return replmd_replicated_request_error(ar, ret);
650         }
651
652         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
653         if (ret != LDB_SUCCESS) {
654                 return replmd_replicated_request_error(ar, ret);
655         }
656
657         /*
658          * the meta data array is already sorted by the caller
659          */
660         for (i=0; i < md->ctr.ctr1.count; i++) {
661                 md->ctr.ctr1.array[i].local_usn = seq_num;
662         }
663         ndr_err = ndr_push_struct_blob(&md_value, msg, 
664                                        lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")),
665                                        md,
666                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
667         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
668                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
669                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
670         }
671         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
672         if (ret != LDB_SUCCESS) {
673                 return replmd_replicated_request_error(ar, ret);
674         }
675
676         replmd_ldb_message_sort(msg, ar->schema);
677
678         ret = ldb_build_add_req(&ar->sub.change_req,
679                                 ar->module->ldb,
680                                 ar->sub.mem_ctx,
681                                 msg,
682                                 NULL,
683                                 ar,
684                                 replmd_replicated_apply_add_callback);
685         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
686
687 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
688         return ldb_next_request(ar->module, ar->sub.change_req);
689 #else
690         ret = ldb_next_request(ar->module, ar->sub.change_req);
691         if (ret != LDB_SUCCESS) {
692                 ldb_asprintf_errstring(ar->module->ldb, "Failed to add replicated object %s: %s", ldb_dn_get_linearized(ar->sub.change_req->op.add.message->dn), 
693                                        ldb_errstring(ar->module->ldb));
694                 return replmd_replicated_request_error(ar, ret);
695         }
696
697         ar->sub.change_ret = ldb_wait(ar->sub.change_req->handle, LDB_WAIT_ALL);
698         if (ar->sub.change_ret != LDB_SUCCESS) {
699                 ldb_asprintf_errstring(ar->module->ldb, "Failed while waiting on add replicated object %s: %s", ldb_dn_get_linearized(ar->sub.change_req->op.add.message->dn), 
700                                        ldb_errstring(ar->module->ldb));
701                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
702         }
703
704         talloc_free(ar->sub.mem_ctx);
705         ZERO_STRUCT(ar->sub);
706
707         ar->index_current++;
708
709         return LDB_SUCCESS;
710 #endif
711 }
712
713 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
714                                                          struct replPropertyMetaData1 *m2)
715 {
716         int ret;
717
718         if (m1->version != m2->version) {
719                 return m1->version - m2->version;
720         }
721
722         if (m1->originating_change_time != m2->originating_change_time) {
723                 return m1->originating_change_time - m2->originating_change_time;
724         }
725
726         ret = GUID_compare(&m1->originating_invocation_id, &m2->originating_invocation_id);
727         if (ret != 0) {
728                 return ret;
729         }
730
731         return m1->originating_usn - m2->originating_usn;
732 }
733
734 static int replmd_replicated_apply_merge_callback(struct ldb_context *ldb,
735                                                   void *private_data,
736                                                   struct ldb_reply *ares)
737 {
738 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
739         struct replmd_replicated_request *ar = talloc_get_type(private_data,
740                                                struct replmd_replicated_request);
741
742         ret = ldb_next_request(ar->module, ar->sub.change_req);
743         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
744
745         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
746         if (ar->sub.change_ret != LDB_SUCCESS) {
747                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
748         }
749
750         talloc_free(ar->sub.mem_ctx);
751         ZERO_STRUCT(ar->sub);
752
753         ar->index_current++;
754
755         return LDB_SUCCESS;
756 #else
757         return LDB_SUCCESS;
758 #endif
759 }
760
761 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
762 {
763         enum ndr_err_code ndr_err;
764         struct ldb_message *msg;
765         struct replPropertyMetaDataBlob *rmd;
766         struct replPropertyMetaDataBlob omd;
767         const struct ldb_val *omd_value;
768         struct replPropertyMetaDataBlob nmd;
769         struct ldb_val nmd_value;
770         uint32_t i,j,ni=0;
771         uint32_t removed_attrs = 0;
772         uint64_t seq_num;
773         int ret;
774
775         msg = ar->objs->objects[ar->index_current].msg;
776         rmd = ar->objs->objects[ar->index_current].meta_data;
777         ZERO_STRUCT(omd);
778         omd.version = 1;
779
780         /*
781          * TODO: add rename conflict handling
782          */
783         if (ldb_dn_compare(msg->dn, ar->sub.search_msg->dn) != 0) {
784                 ldb_debug_set(ar->module->ldb, LDB_DEBUG_FATAL, "replmd_replicated_apply_merge[%u]: rename not supported",
785                               ar->index_current);
786                 ldb_debug(ar->module->ldb, LDB_DEBUG_FATAL, "%s => %s\n",
787                           ldb_dn_get_linearized(ar->sub.search_msg->dn),
788                           ldb_dn_get_linearized(msg->dn));
789                 return replmd_replicated_request_werror(ar, WERR_NOT_SUPPORTED);
790         }
791
792         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
793         if (ret != LDB_SUCCESS) {
794                 return replmd_replicated_request_error(ar, ret);
795         }
796
797         /* find existing meta data */
798         omd_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replPropertyMetaData");
799         if (omd_value) {
800                 ndr_err = ndr_pull_struct_blob(omd_value, ar->sub.mem_ctx, 
801                                                lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")), &omd,
802                                                (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
803                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
804                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
805                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
806                 }
807
808                 if (omd.version != 1) {
809                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
810                 }
811         }
812
813         ZERO_STRUCT(nmd);
814         nmd.version = 1;
815         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
816         nmd.ctr.ctr1.array = talloc_array(ar->sub.mem_ctx,
817                                           struct replPropertyMetaData1,
818                                           nmd.ctr.ctr1.count);
819         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
820
821         /* first copy the old meta data */
822         for (i=0; i < omd.ctr.ctr1.count; i++) {
823                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
824                 ni++;
825         }
826
827         /* now merge in the new meta data */
828         for (i=0; i < rmd->ctr.ctr1.count; i++) {
829                 bool found = false;
830
831                 rmd->ctr.ctr1.array[i].local_usn = seq_num;
832
833                 for (j=0; j < ni; j++) {
834                         int cmp;
835
836                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
837                                 continue;
838                         }
839
840                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
841                                                                             &nmd.ctr.ctr1.array[j]);
842                         if (cmp > 0) {
843                                 /* replace the entry */
844                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
845                                 found = true;
846                                 break;
847                         }
848
849                         /* we don't want to apply this change so remove the attribute */
850                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
851                         removed_attrs++;
852
853                         found = true;
854                         break;
855                 }
856
857                 if (found) continue;
858
859                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
860                 ni++;
861         }
862
863         /*
864          * finally correct the size of the meta_data array
865          */
866         nmd.ctr.ctr1.count = ni;
867
868         /*
869          * the rdn attribute (the alias for the name attribute),
870          * 'cn' for most objects is the last entry in the meta data array
871          * we have stored
872          *
873          * sort the new meta data array
874          */
875         {
876                 struct replPropertyMetaData1 *rdn_p;
877                 uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
878
879                 rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
880                 replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_p->attid);
881         }
882
883         /* create the meta data value */
884         ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
885                                        lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")),
886                                        &nmd,
887                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
888         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
889                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
890                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
891         }
892
893         /*
894          * check if some replicated attributes left, otherwise skip the ldb_modify() call
895          */
896         if (msg->num_elements == 0) {
897                 ldb_debug(ar->module->ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
898                           ar->index_current);
899                 goto next_object;
900         }
901
902         ldb_debug(ar->module->ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
903                   ar->index_current, msg->num_elements);
904
905         /*
906          * when we now that we'll modify the record, add the whenChanged, uSNChanged
907          * and replPopertyMetaData attributes
908          */
909         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
910         if (ret != LDB_SUCCESS) {
911                 return replmd_replicated_request_error(ar, ret);
912         }
913         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
914         if (ret != LDB_SUCCESS) {
915                 return replmd_replicated_request_error(ar, ret);
916         }
917         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
918         if (ret != LDB_SUCCESS) {
919                 return replmd_replicated_request_error(ar, ret);
920         }
921
922         replmd_ldb_message_sort(msg, ar->schema);
923
924         /* we want to replace the old values */
925         for (i=0; i < msg->num_elements; i++) {
926                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
927         }
928
929         ret = ldb_build_mod_req(&ar->sub.change_req,
930                                 ar->module->ldb,
931                                 ar->sub.mem_ctx,
932                                 msg,
933                                 NULL,
934                                 ar,
935                                 replmd_replicated_apply_merge_callback);
936         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
937
938 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
939         return ldb_next_request(ar->module, ar->sub.change_req);
940 #else
941         ret = ldb_next_request(ar->module, ar->sub.change_req);
942         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
943
944         ar->sub.change_ret = ldb_wait(ar->sub.change_req->handle, LDB_WAIT_ALL);
945         if (ar->sub.change_ret != LDB_SUCCESS) {
946                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
947         }
948
949 next_object:
950         talloc_free(ar->sub.mem_ctx);
951         ZERO_STRUCT(ar->sub);
952
953         ar->index_current++;
954
955         return LDB_SUCCESS;
956 #endif
957 }
958
959 static int replmd_replicated_apply_search_callback(struct ldb_context *ldb,
960                                                    void *private_data,
961                                                    struct ldb_reply *ares)
962 {
963         struct replmd_replicated_request *ar = talloc_get_type(private_data,
964                                                struct replmd_replicated_request);
965         bool is_done = false;
966
967         switch (ares->type) {
968         case LDB_REPLY_ENTRY:
969                 ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
970                 break;
971         case LDB_REPLY_REFERRAL:
972                 /* we ignore referrals */
973                 break;
974         case LDB_REPLY_EXTENDED:
975         case LDB_REPLY_DONE:
976                 is_done = true;
977         }
978
979         talloc_free(ares);
980
981 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
982         if (is_done) {
983                 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
984                 if (ar->sub.search_ret != LDB_SUCCESS) {
985                         return replmd_replicated_request_error(ar, ar->sub.search_ret);
986                 }
987                 if (ar->sub.search_msg) {
988                         return replmd_replicated_apply_merge(ar);
989                 }
990                 return replmd_replicated_apply_add(ar);
991         }
992 #endif
993         return LDB_SUCCESS;
994 }
995
996 static int replmd_replicated_apply_search(struct replmd_replicated_request *ar)
997 {
998         int ret;
999         char *tmp_str;
1000         char *filter;
1001
1002         tmp_str = ldb_binary_encode(ar->sub.mem_ctx, ar->objs->objects[ar->index_current].guid_value);
1003         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1004
1005         filter = talloc_asprintf(ar->sub.mem_ctx, "(objectGUID=%s)", tmp_str);
1006         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1007         talloc_free(tmp_str);
1008
1009         ret = ldb_build_search_req(&ar->sub.search_req,
1010                                    ar->module->ldb,
1011                                    ar->sub.mem_ctx,
1012                                    ar->objs->partition_dn,
1013                                    LDB_SCOPE_SUBTREE,
1014                                    filter,
1015                                    NULL,
1016                                    NULL,
1017                                    ar,
1018                                    replmd_replicated_apply_search_callback);
1019         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1020
1021 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1022         return ldb_next_request(ar->module, ar->sub.search_req);
1023 #else
1024         ret = ldb_next_request(ar->module, ar->sub.search_req);
1025         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1026
1027         ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1028         if (ar->sub.search_ret != LDB_SUCCESS && ar->sub.search_ret != LDB_ERR_NO_SUCH_OBJECT) {
1029                 return replmd_replicated_request_error(ar, ar->sub.search_ret);
1030         }
1031         if (ar->sub.search_msg) {
1032                 return replmd_replicated_apply_merge(ar);
1033         }
1034
1035         return replmd_replicated_apply_add(ar);
1036 #endif
1037 }
1038
1039 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
1040 {
1041 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1042         if (ar->index_current >= ar->objs->num_objects) {
1043                 return replmd_replicated_uptodate_vector(ar);
1044         }
1045 #endif
1046
1047         ar->sub.mem_ctx = talloc_new(ar);
1048         if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1049
1050         return replmd_replicated_apply_search(ar);
1051 }
1052
1053 static int replmd_replicated_uptodate_modify_callback(struct ldb_context *ldb,
1054                                                       void *private_data,
1055                                                       struct ldb_reply *ares)
1056 {
1057 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1058         struct replmd_replicated_request *ar = talloc_get_type(private_data,
1059                                                struct replmd_replicated_request);
1060
1061         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1062         if (ar->sub.change_ret != LDB_SUCCESS) {
1063                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
1064         }
1065
1066         talloc_free(ar->sub.mem_ctx);
1067         ZERO_STRUCT(ar->sub);
1068
1069         return replmd_replicated_request_done(ar);
1070 #else
1071         return LDB_SUCCESS;
1072 #endif
1073 }
1074
1075 static int replmd_drsuapi_DsReplicaCursor2_compare(const struct drsuapi_DsReplicaCursor2 *c1,
1076                                                    const struct drsuapi_DsReplicaCursor2 *c2)
1077 {
1078         return GUID_compare(&c1->source_dsa_invocation_id, &c2->source_dsa_invocation_id);
1079 }
1080
1081 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
1082 {
1083         enum ndr_err_code ndr_err;
1084         struct ldb_message *msg;
1085         struct replUpToDateVectorBlob ouv;
1086         const struct ldb_val *ouv_value;
1087         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
1088         struct replUpToDateVectorBlob nuv;
1089         struct ldb_val nuv_value;
1090         struct ldb_message_element *nuv_el = NULL;
1091         const struct GUID *our_invocation_id;
1092         struct ldb_message_element *orf_el = NULL;
1093         struct repsFromToBlob nrf;
1094         struct ldb_val *nrf_value = NULL;
1095         struct ldb_message_element *nrf_el = NULL;
1096         uint32_t i,j,ni=0;
1097         uint64_t seq_num;
1098         bool found = false;
1099         time_t t = time(NULL);
1100         NTTIME now;
1101         int ret;
1102
1103         ruv = ar->objs->uptodateness_vector;
1104         ZERO_STRUCT(ouv);
1105         ouv.version = 2;
1106         ZERO_STRUCT(nuv);
1107         nuv.version = 2;
1108
1109         unix_to_nt_time(&now, t);
1110
1111         /* 
1112          * we use the next sequence number for our own highest_usn
1113          * because we will do a modify request and this will increment
1114          * our highest_usn
1115          */
1116         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
1117         if (ret != LDB_SUCCESS) {
1118                 return replmd_replicated_request_error(ar, ret);
1119         }
1120
1121         /*
1122          * first create the new replUpToDateVector
1123          */
1124         ouv_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replUpToDateVector");
1125         if (ouv_value) {
1126                 ndr_err = ndr_pull_struct_blob(ouv_value, ar->sub.mem_ctx, 
1127                                                lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")), &ouv,
1128                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
1129                 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1130                         NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1131                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1132                 }
1133
1134                 if (ouv.version != 2) {
1135                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1136                 }
1137         }
1138
1139         /*
1140          * the new uptodateness vector will at least
1141          * contain 1 entry, one for the source_dsa
1142          *
1143          * plus optional values from our old vector and the one from the source_dsa
1144          */
1145         nuv.ctr.ctr2.count = 1 + ouv.ctr.ctr2.count;
1146         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1147         nuv.ctr.ctr2.cursors = talloc_array(ar->sub.mem_ctx,
1148                                             struct drsuapi_DsReplicaCursor2,
1149                                             nuv.ctr.ctr2.count);
1150         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1151
1152         /* first copy the old vector */
1153         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1154                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1155                 ni++;
1156         }
1157
1158         /* get our invocation_id if we have one already attached to the ldb */
1159         our_invocation_id = samdb_ntds_invocation_id(ar->module->ldb);
1160
1161         /* merge in the source_dsa vector is available */
1162         for (i=0; (ruv && i < ruv->count); i++) {
1163                 found = false;
1164
1165                 if (our_invocation_id &&
1166                     GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1167                                our_invocation_id)) {
1168                         continue;
1169                 }
1170
1171                 for (j=0; j < ni; j++) {
1172                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1173                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1174                                 continue;
1175                         }
1176
1177                         found = true;
1178
1179                         /*
1180                          * we update only the highest_usn and not the latest_sync_success time,
1181                          * because the last success stands for direct replication
1182                          */
1183                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1184                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1185                         }
1186                         break;                  
1187                 }
1188
1189                 if (found) continue;
1190
1191                 /* if it's not there yet, add it */
1192                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1193                 ni++;
1194         }
1195
1196         /*
1197          * merge in the current highwatermark for the source_dsa
1198          */
1199         found = false;
1200         for (j=0; j < ni; j++) {
1201                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1202                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1203                         continue;
1204                 }
1205
1206                 found = true;
1207
1208                 /*
1209                  * here we update the highest_usn and last_sync_success time
1210                  * because we're directly replicating from the source_dsa
1211                  *
1212                  * and use the tmp_highest_usn because this is what we have just applied
1213                  * to our ldb
1214                  */
1215                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1216                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1217                 break;
1218         }
1219         if (!found) {
1220                 /*
1221                  * here we update the highest_usn and last_sync_success time
1222                  * because we're directly replicating from the source_dsa
1223                  *
1224                  * and use the tmp_highest_usn because this is what we have just applied
1225                  * to our ldb
1226                  */
1227                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1228                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1229                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1230                 ni++;
1231         }
1232
1233         /*
1234          * finally correct the size of the cursors array
1235          */
1236         nuv.ctr.ctr2.count = ni;
1237
1238         /*
1239          * sort the cursors
1240          */
1241         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1242               sizeof(struct drsuapi_DsReplicaCursor2),
1243               (comparison_fn_t)replmd_drsuapi_DsReplicaCursor2_compare);
1244
1245         /*
1246          * create the change ldb_message
1247          */
1248         msg = ldb_msg_new(ar->sub.mem_ctx);
1249         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1250         msg->dn = ar->sub.search_msg->dn;
1251
1252         ndr_err = ndr_push_struct_blob(&nuv_value, msg, 
1253                                        lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")), 
1254                                        &nuv,
1255                                        (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1256         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1257                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1258                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1259         }
1260         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1261         if (ret != LDB_SUCCESS) {
1262                 return replmd_replicated_request_error(ar, ret);
1263         }
1264         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1265
1266         /*
1267          * now create the new repsFrom value from the given repsFromTo1 structure
1268          */
1269         ZERO_STRUCT(nrf);
1270         nrf.version                                     = 1;
1271         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1272         /* and fix some values... */
1273         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1274         nrf.ctr.ctr1.last_success                       = now;
1275         nrf.ctr.ctr1.last_attempt                       = now;
1276         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1277         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1278
1279         /*
1280          * first see if we already have a repsFrom value for the current source dsa
1281          * if so we'll later replace this value
1282          */
1283         orf_el = ldb_msg_find_element(ar->sub.search_msg, "repsFrom");
1284         if (orf_el) {
1285                 for (i=0; i < orf_el->num_values; i++) {
1286                         struct repsFromToBlob *trf;
1287
1288                         trf = talloc(ar->sub.mem_ctx, struct repsFromToBlob);
1289                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1290
1291                         ndr_err = ndr_pull_struct_blob(&orf_el->values[i], trf, lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")), trf,
1292                                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1293                         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1294                                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1295                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1296                         }
1297
1298                         if (trf->version != 1) {
1299                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1300                         }
1301
1302                         /*
1303                          * we compare the source dsa objectGUID not the invocation_id
1304                          * because we want only one repsFrom value per source dsa
1305                          * and when the invocation_id of the source dsa has changed we don't need 
1306                          * the old repsFrom with the old invocation_id
1307                          */
1308                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1309                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1310                                 talloc_free(trf);
1311                                 continue;
1312                         }
1313
1314                         talloc_free(trf);
1315                         nrf_value = &orf_el->values[i];
1316                         break;
1317                 }
1318
1319                 /*
1320                  * copy over all old values to the new ldb_message
1321                  */
1322                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1323                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1324                 *nrf_el = *orf_el;
1325         }
1326
1327         /*
1328          * if we haven't found an old repsFrom value for the current source dsa
1329          * we'll add a new value
1330          */
1331         if (!nrf_value) {
1332                 struct ldb_val zero_value;
1333                 ZERO_STRUCT(zero_value);
1334                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1335                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1336
1337                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1338         }
1339
1340         /* we now fill the value which is already attached to ldb_message */
1341         ndr_err = ndr_push_struct_blob(nrf_value, msg, 
1342                                        lp_iconv_convenience(ldb_get_opaque(ar->module->ldb, "loadparm")),
1343                                        &nrf,
1344                                        (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1345         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1346                 NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
1347                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1348         }
1349
1350         /* 
1351          * the ldb_message_element for the attribute, has all the old values and the new one
1352          * so we'll replace the whole attribute with all values
1353          */
1354         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1355
1356         /* prepare the ldb_modify() request */
1357         ret = ldb_build_mod_req(&ar->sub.change_req,
1358                                 ar->module->ldb,
1359                                 ar->sub.mem_ctx,
1360                                 msg,
1361                                 NULL,
1362                                 ar,
1363                                 replmd_replicated_uptodate_modify_callback);
1364         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1365
1366 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1367         return ldb_next_request(ar->module, ar->sub.change_req);
1368 #else
1369         ret = ldb_next_request(ar->module, ar->sub.change_req);
1370         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1371
1372         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1373         if (ar->sub.change_ret != LDB_SUCCESS) {
1374                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
1375         }
1376
1377         talloc_free(ar->sub.mem_ctx);
1378         ZERO_STRUCT(ar->sub);
1379
1380         return replmd_replicated_request_done(ar);
1381 #endif
1382 }
1383
1384 static int replmd_replicated_uptodate_search_callback(struct ldb_context *ldb,
1385                                                       void *private_data,
1386                                                       struct ldb_reply *ares)
1387 {
1388         struct replmd_replicated_request *ar = talloc_get_type(private_data,
1389                                                struct replmd_replicated_request);
1390         bool is_done = false;
1391
1392         switch (ares->type) {
1393         case LDB_REPLY_ENTRY:
1394                 ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
1395                 break;
1396         case LDB_REPLY_REFERRAL:
1397                 /* we ignore referrals */
1398                 break;
1399         case LDB_REPLY_EXTENDED:
1400         case LDB_REPLY_DONE:
1401                 is_done = true;
1402         }
1403
1404         talloc_free(ares);
1405
1406 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1407         if (is_done) {
1408                 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1409                 if (ar->sub.search_ret != LDB_SUCCESS) {
1410                         return replmd_replicated_request_error(ar, ar->sub.search_ret);
1411                 }
1412                 if (!ar->sub.search_msg) {
1413                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1414                 }
1415
1416                 return replmd_replicated_uptodate_modify(ar);
1417         }
1418 #endif
1419         return LDB_SUCCESS;
1420 }
1421
1422 static int replmd_replicated_uptodate_search(struct replmd_replicated_request *ar)
1423 {
1424         int ret;
1425         static const char *attrs[] = {
1426                 "replUpToDateVector",
1427                 "repsFrom",
1428                 NULL
1429         };
1430
1431         ret = ldb_build_search_req(&ar->sub.search_req,
1432                                    ar->module->ldb,
1433                                    ar->sub.mem_ctx,
1434                                    ar->objs->partition_dn,
1435                                    LDB_SCOPE_BASE,
1436                                    "(objectClass=*)",
1437                                    attrs,
1438                                    NULL,
1439                                    ar,
1440                                    replmd_replicated_uptodate_search_callback);
1441         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1442
1443 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1444         return ldb_next_request(ar->module, ar->sub.search_req);
1445 #else
1446         ret = ldb_next_request(ar->module, ar->sub.search_req);
1447         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1448
1449         ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1450         if (ar->sub.search_ret != LDB_SUCCESS) {
1451                 return replmd_replicated_request_error(ar, ar->sub.search_ret);
1452         }
1453         if (!ar->sub.search_msg) {
1454                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1455         }
1456
1457         return replmd_replicated_uptodate_modify(ar);
1458 #endif
1459 }
1460
1461 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1462 {
1463         ar->sub.mem_ctx = talloc_new(ar);
1464         if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1465
1466         return replmd_replicated_uptodate_search(ar);
1467 }
1468
1469 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1470 {
1471         struct dsdb_extended_replicated_objects *objs;
1472         struct replmd_replicated_request *ar;
1473
1474         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1475
1476         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1477         if (!objs) {
1478                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1479                 return LDB_ERR_PROTOCOL_ERROR;
1480         }
1481
1482         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1483                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1484                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1485                 return LDB_ERR_PROTOCOL_ERROR;
1486         }
1487
1488         ar = replmd_replicated_init_handle(module, req, objs);
1489         if (!ar) {
1490                 return LDB_ERR_OPERATIONS_ERROR;
1491         }
1492
1493 #ifdef REPLMD_FULL_ASYNC /* TODO: activate this code when ldb support full async code */ 
1494         return replmd_replicated_apply_next(ar);
1495 #else
1496         while (ar->index_current < ar->objs->num_objects &&
1497                req->handle->state != LDB_ASYNC_DONE) { 
1498                 replmd_replicated_apply_next(ar);
1499         }
1500
1501         if (req->handle->state != LDB_ASYNC_DONE) {
1502                 replmd_replicated_uptodate_vector(ar);
1503         }
1504
1505         return LDB_SUCCESS;
1506 #endif
1507 }
1508
1509 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
1510 {
1511         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
1512                 return replmd_extended_replicated_objects(module, req);
1513         }
1514
1515         return ldb_next_request(module, req);
1516 }
1517
1518 static int replmd_wait_none(struct ldb_handle *handle) {
1519         struct replmd_replicated_request *ar;
1520     
1521         if (!handle || !handle->private_data) {
1522                 return LDB_ERR_OPERATIONS_ERROR;
1523         }
1524
1525         ar = talloc_get_type(handle->private_data, struct replmd_replicated_request);
1526         if (!ar) {
1527                 return LDB_ERR_OPERATIONS_ERROR;
1528         }
1529
1530         /* we do only sync calls */
1531         if (handle->state != LDB_ASYNC_DONE) {
1532                 return LDB_ERR_OPERATIONS_ERROR;
1533         }
1534
1535         return handle->status;
1536 }
1537
1538 static int replmd_wait_all(struct ldb_handle *handle) {
1539
1540         int ret;
1541
1542         while (handle->state != LDB_ASYNC_DONE) {
1543                 ret = replmd_wait_none(handle);
1544                 if (ret != LDB_SUCCESS) {
1545                         return ret;
1546                 }
1547         }
1548
1549         return handle->status;
1550 }
1551
1552 static int replmd_wait(struct ldb_handle *handle, enum ldb_wait_type type)
1553 {
1554         if (type == LDB_WAIT_ALL) {
1555                 return replmd_wait_all(handle);
1556         } else {
1557                 return replmd_wait_none(handle);
1558         }
1559 }
1560
1561 static const struct ldb_module_ops replmd_ops = {
1562         .name          = "repl_meta_data",
1563         .add           = replmd_add,
1564         .modify        = replmd_modify,
1565         .extended      = replmd_extended,
1566         .wait          = replmd_wait
1567 };
1568
1569 int repl_meta_data_module_init(void)
1570 {
1571         return ldb_register_module(&replmd_ops);
1572 }