r20863: check that there's a current partition control attached to the request
[samba.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
1 /* 
2    ldb database library
3
4    Copyright (C) Simo Sorce  2004-2006
5    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6    Copyright (C) Andrew Tridgell 2005
7    Copyright (C) Stefan Metzmacher 2007
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12    
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 2 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, write to the Free Software
25    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 */
27
28 /*
29  *  Name: ldb
30  *
31  *  Component: ldb repl_meta_data module
32  *
33  *  Description: - add a unique objectGUID onto every new record,
34  *               - handle whenCreated, whenChanged timestamps
35  *               - handle uSNCreated, uSNChanged numbers
36  *               - handle replPropertyMetaData attribute
37  *
38  *  Author: Simo Sorce
39  *  Author: Stefan Metzmacher
40  */
41
42 #include "includes.h"
43 #include "lib/ldb/include/ldb.h"
44 #include "lib/ldb/include/ldb_errors.h"
45 #include "lib/ldb/include/ldb_private.h"
46 #include "dsdb/samdb/samdb.h"
47 #include "librpc/gen_ndr/ndr_misc.h"
48 #include "librpc/gen_ndr/ndr_drsuapi.h"
49 #include "librpc/gen_ndr/ndr_drsblobs.h"
50
51 struct replmd_replicated_request {
52         struct ldb_module *module;
53         struct ldb_handle *handle;
54         struct ldb_request *orig_req;
55
56         struct dsdb_extended_replicated_objects *objs;
57
58         uint32_t index_current;
59
60         struct {
61                 TALLOC_CTX *mem_ctx;
62                 struct ldb_request *search_req;
63                 struct ldb_message *search_msg;
64                 int search_ret;
65                 struct ldb_request *change_req;
66                 int change_ret;
67         } sub;
68 };
69
70 static struct replmd_replicated_request *replmd_replicated_init_handle(struct ldb_module *module,
71                                                                        struct ldb_request *req,
72                                                                        struct dsdb_extended_replicated_objects *objs)
73 {
74         struct replmd_replicated_request *ar;
75         struct ldb_handle *h;
76
77         h = talloc_zero(req, struct ldb_handle);
78         if (h == NULL) {
79                 ldb_set_errstring(module->ldb, "Out of Memory");
80                 return NULL;
81         }
82
83         h->module       = module;
84         h->state        = LDB_ASYNC_PENDING;
85         h->status       = LDB_SUCCESS;
86
87         ar = talloc_zero(h, struct replmd_replicated_request);
88         if (ar == NULL) {
89                 ldb_set_errstring(module->ldb, "Out of Memory");
90                 talloc_free(h);
91                 return NULL;
92         }
93
94         h->private_data = ar;
95
96         ar->module      = module;
97         ar->handle      = h;
98         ar->orig_req    = req;
99         ar->objs        = objs;
100
101         req->handle = h;
102
103         return ar;
104 }
105
106 static struct ldb_message_element *replmd_find_attribute(const struct ldb_message *msg, const char *name)
107 {
108         int i;
109
110         for (i = 0; i < msg->num_elements; i++) {
111                 if (ldb_attr_cmp(name, msg->elements[i].name) == 0) {
112                         return &msg->elements[i];
113                 }
114         }
115
116         return NULL;
117 }
118
119 /*
120   add a time element to a record
121 */
122 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
123 {
124         struct ldb_message_element *el;
125         char *s;
126
127         if (ldb_msg_find_element(msg, attr) != NULL) {
128                 return 0;
129         }
130
131         s = ldb_timestring(msg, t);
132         if (s == NULL) {
133                 return -1;
134         }
135
136         if (ldb_msg_add_string(msg, attr, s) != 0) {
137                 return -1;
138         }
139
140         el = ldb_msg_find_element(msg, attr);
141         /* always set as replace. This works because on add ops, the flag
142            is ignored */
143         el->flags = LDB_FLAG_MOD_REPLACE;
144
145         return 0;
146 }
147
148 /*
149   add a uint64_t element to a record
150 */
151 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
152 {
153         struct ldb_message_element *el;
154
155         if (ldb_msg_find_element(msg, attr) != NULL) {
156                 return 0;
157         }
158
159         if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != 0) {
160                 return -1;
161         }
162
163         el = ldb_msg_find_element(msg, attr);
164         /* always set as replace. This works because on add ops, the flag
165            is ignored */
166         el->flags = LDB_FLAG_MOD_REPLACE;
167
168         return 0;
169 }
170
171 static int replmd_add_originating(struct ldb_module *module,
172                                   struct ldb_request *req,
173                                   const struct dsdb_schema *schema,
174                                   const struct dsdb_control_current_partition *partition)
175 {
176         struct ldb_request *down_req;
177         struct ldb_message_element *attribute;
178         struct ldb_message *msg;
179         struct ldb_val v;
180         struct GUID guid;
181         uint64_t seq_num;
182         NTSTATUS nt_status;
183         int ret;
184         time_t t = time(NULL);
185
186         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_add_originating\n");
187
188         if ((attribute = replmd_find_attribute(req->op.add.message, "objectGUID")) != NULL ) {
189                 return ldb_next_request(module, req);
190         }
191
192         down_req = talloc(req, struct ldb_request);
193         if (down_req == NULL) {
194                 return LDB_ERR_OPERATIONS_ERROR;
195         }
196
197         *down_req = *req;
198
199         /* we have to copy the message as the caller might have it as a const */
200         down_req->op.add.message = msg = ldb_msg_copy_shallow(down_req, req->op.add.message);
201         if (msg == NULL) {
202                 talloc_free(down_req);
203                 return LDB_ERR_OPERATIONS_ERROR;
204         }
205
206         /* a new GUID */
207         guid = GUID_random();
208
209         nt_status = ndr_push_struct_blob(&v, msg, &guid, 
210                                          (ndr_push_flags_fn_t)ndr_push_GUID);
211         if (!NT_STATUS_IS_OK(nt_status)) {
212                 talloc_free(down_req);
213                 return LDB_ERR_OPERATIONS_ERROR;
214         }
215
216         ret = ldb_msg_add_value(msg, "objectGUID", &v, NULL);
217         if (ret) {
218                 talloc_free(down_req);
219                 return ret;
220         }
221         
222         if (add_time_element(msg, "whenCreated", t) != 0 ||
223             add_time_element(msg, "whenChanged", t) != 0) {
224                 talloc_free(down_req);
225                 return LDB_ERR_OPERATIONS_ERROR;
226         }
227
228         /* Get a sequence number from the backend */
229         ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
230         if (ret == LDB_SUCCESS) {
231                 if (add_uint64_element(msg, "uSNCreated", seq_num) != 0 ||
232                     add_uint64_element(msg, "uSNChanged", seq_num) != 0) {
233                         talloc_free(down_req);
234                         return LDB_ERR_OPERATIONS_ERROR;
235                 }
236         }
237
238         ldb_set_timeout_from_prev_req(module->ldb, req, down_req);
239
240         /* go on with the call chain */
241         ret = ldb_next_request(module, down_req);
242
243         /* do not free down_req as the call results may be linked to it,
244          * it will be freed when the upper level request get freed */
245         if (ret == LDB_SUCCESS) {
246                 req->handle = down_req->handle;
247         }
248
249         return ret;
250 }
251
252 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
253 {
254         const struct dsdb_schema *schema;
255         const struct ldb_control *partition_ctrl;
256         const struct dsdb_control_current_partition *partition;
257
258         /* do not manipulate our control entries */
259         if (ldb_dn_is_special(req->op.add.message->dn)) {
260                 return ldb_next_request(module, req);
261         }
262
263         schema = dsdb_get_schema(module->ldb);
264         if (!schema) {
265                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
266                               "replmd_add: no dsdb_schema loaded");
267                 return LDB_ERR_CONSTRAINT_VIOLATION;
268         }
269
270         partition_ctrl = get_control_from_list(req->controls, DSDB_CONTROL_CURRENT_PARTITION_OID);
271         if (!partition_ctrl) {
272                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
273                               "replmd_add: no current partition control found");
274                 return LDB_ERR_CONSTRAINT_VIOLATION;
275         }
276
277         partition = talloc_get_type(partition_ctrl->data,
278                                     struct dsdb_control_current_partition);
279         if (!partition) {
280                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
281                               "replmd_add: current partition control contains invalid data");
282                 return LDB_ERR_CONSTRAINT_VIOLATION;
283         }
284
285         if (partition->version != DSDB_CONTROL_CURRENT_PARTITION_VERSION) {
286                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
287                               "replmd_add: current partition control contains invalid version [%u != %u]\n",
288                               partition->version, DSDB_CONTROL_CURRENT_PARTITION_VERSION);
289                 return LDB_ERR_CONSTRAINT_VIOLATION;
290         }
291
292         return replmd_add_originating(module, req, schema, partition);
293 }
294
295 static int replmd_modify_originating(struct ldb_module *module,
296                                      struct ldb_request *req,
297                                      const struct dsdb_schema *schema,
298                                      const struct dsdb_control_current_partition *partition)
299 {
300         struct ldb_request *down_req;
301         struct ldb_message *msg;
302         int ret;
303         time_t t = time(NULL);
304         uint64_t seq_num;
305
306         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_modify_originating\n");
307
308         down_req = talloc(req, struct ldb_request);
309         if (down_req == NULL) {
310                 return LDB_ERR_OPERATIONS_ERROR;
311         }
312
313         *down_req = *req;
314
315         /* we have to copy the message as the caller might have it as a const */
316         down_req->op.mod.message = msg = ldb_msg_copy_shallow(down_req, req->op.mod.message);
317         if (msg == NULL) {
318                 talloc_free(down_req);
319                 return LDB_ERR_OPERATIONS_ERROR;
320         }
321
322         if (add_time_element(msg, "whenChanged", t) != 0) {
323                 talloc_free(down_req);
324                 return LDB_ERR_OPERATIONS_ERROR;
325         }
326
327         /* Get a sequence number from the backend */
328         ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
329         if (ret == LDB_SUCCESS) {
330                 if (add_uint64_element(msg, "uSNChanged", seq_num) != 0) {
331                         talloc_free(down_req);
332                         return LDB_ERR_OPERATIONS_ERROR;
333                 }
334         }
335
336         ldb_set_timeout_from_prev_req(module->ldb, req, down_req);
337
338         /* go on with the call chain */
339         ret = ldb_next_request(module, down_req);
340
341         /* do not free down_req as the call results may be linked to it,
342          * it will be freed when the upper level request get freed */
343         if (ret == LDB_SUCCESS) {
344                 req->handle = down_req->handle;
345         }
346
347         return ret;
348 }
349
350 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
351 {
352         const struct dsdb_schema *schema;
353         const struct ldb_control *partition_ctrl;
354         const struct dsdb_control_current_partition *partition;
355  
356         /* do not manipulate our control entries */
357         if (ldb_dn_is_special(req->op.mod.message->dn)) {
358                 return ldb_next_request(module, req);
359         }
360
361         schema = dsdb_get_schema(module->ldb);
362         if (!schema) {
363                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
364                               "replmd_modify: no dsdb_schema loaded");
365                 return LDB_ERR_CONSTRAINT_VIOLATION;
366         }
367
368         schema = dsdb_get_schema(module->ldb);
369         if (!schema) {
370                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
371                               "replmd_modify: no dsdb_schema loaded");
372                 return LDB_ERR_CONSTRAINT_VIOLATION;
373         }
374
375         partition_ctrl = get_control_from_list(req->controls, DSDB_CONTROL_CURRENT_PARTITION_OID);
376         if (!partition_ctrl) {
377                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
378                               "replmd_modify: no current partition control found");
379                 return LDB_ERR_CONSTRAINT_VIOLATION;
380         }
381
382         partition = talloc_get_type(partition_ctrl->data,
383                                     struct dsdb_control_current_partition);
384         if (!partition) {
385                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
386                               "replmd_modify: current partition control contains invalid data");
387                 return LDB_ERR_CONSTRAINT_VIOLATION;
388         }
389
390         if (partition->version != DSDB_CONTROL_CURRENT_PARTITION_VERSION) {
391                 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
392                               "replmd_modify: current partition control contains invalid version [%u != %u]\n",
393                               partition->version, DSDB_CONTROL_CURRENT_PARTITION_VERSION);
394                 return LDB_ERR_CONSTRAINT_VIOLATION;
395         }
396
397         return replmd_modify_originating(module, req, schema, partition);
398 }
399
400 static int replmd_replicated_request_reply_helper(struct replmd_replicated_request *ar, int ret)
401 {
402         struct ldb_reply *ares = NULL;
403
404         ar->handle->status = ret;
405         ar->handle->state = LDB_ASYNC_DONE;
406
407         if (!ar->orig_req->callback) {
408                 return LDB_SUCCESS;
409         }
410         
411         /* we're done and need to report the success to the caller */
412         ares = talloc_zero(ar, struct ldb_reply);
413         if (!ares) {
414                 ar->handle->status = LDB_ERR_OPERATIONS_ERROR;
415                 ar->handle->state = LDB_ASYNC_DONE;
416                 return LDB_ERR_OPERATIONS_ERROR;
417         }
418
419         ares->type      = LDB_REPLY_EXTENDED;
420         ares->response  = NULL;
421
422         return ar->orig_req->callback(ar->module->ldb, ar->orig_req->context, ares);
423 }
424
425 static int replmd_replicated_request_done(struct replmd_replicated_request *ar)
426 {
427         return replmd_replicated_request_reply_helper(ar, LDB_SUCCESS);
428 }
429
430 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
431 {
432         return replmd_replicated_request_reply_helper(ar, ret);
433 }
434
435 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
436 {
437         int ret = LDB_ERR_OTHER;
438         /* TODO: do some error mapping */
439         return replmd_replicated_request_reply_helper(ar, ret);
440 }
441
442 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
443
444 static int replmd_replicated_apply_add_callback(struct ldb_context *ldb,
445                                                 void *private_data,
446                                                 struct ldb_reply *ares)
447 {
448 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
449         struct replmd_replicated_request *ar = talloc_get_type(private_data,
450                                                struct replmd_replicated_request);
451
452         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
453         if (ar->sub.change_ret != LDB_SUCCESS) {
454                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
455         }
456
457         talloc_free(ar->sub.mem_ctx);
458         ZERO_STRUCT(ar->sub);
459
460         ar->index_current++;
461
462         return replmd_replicated_apply_next(ar);
463 #else
464         return LDB_SUCCESS;
465 #endif
466 }
467
468 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
469 {
470         NTSTATUS nt_status;
471         struct ldb_message *msg;
472         struct replPropertyMetaDataBlob *md;
473         struct ldb_val md_value;
474         uint32_t i;
475         uint64_t seq_num;
476         int ret;
477
478         /*
479          * TODO: check if the parent object exist
480          */
481
482         /*
483          * TODO: handle the conflict case where an object with the
484          *       same name exist
485          */
486
487         msg = ar->objs->objects[ar->index_current].msg;
488         md = ar->objs->objects[ar->index_current].meta_data;
489
490         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
491         if (ret != LDB_SUCCESS) {
492                 return replmd_replicated_request_error(ar, ret);
493         }
494
495         ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
496         if (ret != LDB_SUCCESS) {
497                 return replmd_replicated_request_error(ar, ret);
498         }
499
500         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
501         if (ret != LDB_SUCCESS) {
502                 return replmd_replicated_request_error(ar, ret);
503         }
504
505         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNCreated", seq_num);
506         if (ret != LDB_SUCCESS) {
507                 return replmd_replicated_request_error(ar, ret);
508         }
509
510         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
511         if (ret != LDB_SUCCESS) {
512                 return replmd_replicated_request_error(ar, ret);
513         }
514
515         md = ar->objs->objects[ar->index_current].meta_data;
516         for (i=0; i < md->ctr.ctr1.count; i++) {
517                 md->ctr.ctr1.array[i].local_usn = seq_num;
518         }
519         nt_status = ndr_push_struct_blob(&md_value, msg, md,
520                                          (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
521         if (!NT_STATUS_IS_OK(nt_status)) {
522                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
523         }
524         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
525         if (ret != LDB_SUCCESS) {
526                 return replmd_replicated_request_error(ar, ret);
527         }
528
529         ret = ldb_build_add_req(&ar->sub.change_req,
530                                 ar->module->ldb,
531                                 ar->sub.mem_ctx,
532                                 msg,
533                                 NULL,
534                                 ar,
535                                 replmd_replicated_apply_add_callback);
536         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
537
538 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
539         return ldb_next_request(ar->module, ar->sub.change_req);
540 #else
541         ret = ldb_next_request(ar->module, ar->sub.change_req);
542         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
543
544         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
545         if (ar->sub.change_ret != LDB_SUCCESS) {
546                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
547         }
548
549         talloc_free(ar->sub.mem_ctx);
550         ZERO_STRUCT(ar->sub);
551
552         ar->index_current++;
553
554         return LDB_SUCCESS;
555 #endif
556 }
557
558 static int replmd_replPropertyMetaData1_attid_compare(struct replPropertyMetaData1 *m1,
559                                                       struct replPropertyMetaData1 *m2)
560 {
561         return m1->attid - m2->attid;
562 }
563
564 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
565                                                          struct replPropertyMetaData1 *m2)
566 {
567         int ret;
568
569         if (m1->version != m2->version) {
570                 return m1->version - m2->version;
571         }
572
573         if (m1->orginating_time != m2->orginating_time) {
574                 return m1->orginating_time - m2->orginating_time;
575         }
576
577         ret = GUID_compare(&m1->orginating_invocation_id, &m2->orginating_invocation_id);
578         if (ret != 0) {
579                 return ret;
580         }
581
582         return m1->orginating_usn - m2->orginating_usn;
583 }
584
585 static int replmd_replicated_apply_merge_callback(struct ldb_context *ldb,
586                                                   void *private_data,
587                                                   struct ldb_reply *ares)
588 {
589 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
590         struct replmd_replicated_request *ar = talloc_get_type(private_data,
591                                                struct replmd_replicated_request);
592
593         ret = ldb_next_request(ar->module, ar->sub.change_req);
594         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
595
596         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
597         if (ar->sub.change_ret != LDB_SUCCESS) {
598                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
599         }
600
601         talloc_free(ar->sub.mem_ctx);
602         ZERO_STRUCT(ar->sub);
603
604         ar->index_current++;
605
606         return LDB_SUCCESS;
607 #else
608         return LDB_SUCCESS;
609 #endif
610 }
611
612 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
613 {
614         NTSTATUS nt_status;
615         struct ldb_message *msg;
616         struct replPropertyMetaDataBlob *rmd;
617         struct replPropertyMetaDataBlob omd;
618         const struct ldb_val *omd_value;
619         struct replPropertyMetaDataBlob nmd;
620         struct ldb_val nmd_value;
621         uint32_t i,j,ni=0;
622         uint32_t removed_attrs = 0;
623         uint64_t seq_num;
624         int ret;
625
626         msg = ar->objs->objects[ar->index_current].msg;
627         rmd = ar->objs->objects[ar->index_current].meta_data;
628         ZERO_STRUCT(omd);
629         omd.version = 1;
630
631         /*
632          * TODO: add rename conflict handling
633          */
634         if (ldb_dn_compare(msg->dn, ar->sub.search_msg->dn) != 0) {
635                 ldb_debug_set(ar->module->ldb, LDB_DEBUG_FATAL, "replmd_replicated_apply_merge[%u]: rename not supported",
636                               ar->index_current);
637                 ldb_debug(ar->module->ldb, LDB_DEBUG_FATAL, "%s => %s\n",
638                           ldb_dn_get_linearized(ar->sub.search_msg->dn),
639                           ldb_dn_get_linearized(msg->dn));
640                 return replmd_replicated_request_werror(ar, WERR_NOT_SUPPORTED);
641         }
642
643         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
644         if (ret != LDB_SUCCESS) {
645                 return replmd_replicated_request_error(ar, ret);
646         }
647
648         /* find existing meta data */
649         omd_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replPropertyMetaData");
650         if (omd_value) {
651                 nt_status = ndr_pull_struct_blob(omd_value, ar->sub.mem_ctx, &omd,
652                                                  (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
653                 if (!NT_STATUS_IS_OK(nt_status)) {
654                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
655                 }
656
657                 if (omd.version != 1) {
658                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
659                 }
660         }
661
662         ZERO_STRUCT(nmd);
663         nmd.version = 1;
664         nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
665         nmd.ctr.ctr1.array = talloc_array(ar->sub.mem_ctx,
666                                           struct replPropertyMetaData1,
667                                           nmd.ctr.ctr1.count);
668         if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
669
670         /* first copy the old meta data */
671         for (i=0; i < omd.ctr.ctr1.count; i++) {
672                 nmd.ctr.ctr1.array[ni]  = omd.ctr.ctr1.array[i];
673                 ni++;
674         }
675
676         /* now merge in the new meta data */
677         for (i=0; i < rmd->ctr.ctr1.count; i++) {
678                 bool found = false;
679
680                 rmd->ctr.ctr1.array[i].local_usn = seq_num;
681
682                 for (j=0; j < ni; j++) {
683                         int cmp;
684
685                         if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
686                                 continue;
687                         }
688
689                         cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
690                                                                             &nmd.ctr.ctr1.array[j]);
691                         if (cmp > 0) {
692                                 /* replace the entry */
693                                 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
694                                 found = true;
695                                 break;
696                         }
697
698                         /* we don't want to apply this change so remove the attribute */
699                         ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
700                         removed_attrs++;
701
702                         found = true;
703                         break;
704                 }
705
706                 if (found) continue;
707
708                 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
709                 ni++;
710         }
711
712         /*
713          * finally correct the size of the meta_data array
714          */
715         nmd.ctr.ctr1.count = ni;
716
717         /*
718          * the rdn attribute (the alias for the name attribute),
719          * 'cn' for most objects is the last entry in the meta data array
720          * we have stored
721          *
722          * as it should stay the last one in the new list, we move it to the end
723          */
724         {
725                 struct replPropertyMetaData1 *rdn_p, rdn, *last_p;
726                 uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
727                 uint32_t last_idx = ni - 1;
728
729                 rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
730                 rdn = *rdn_p;
731                 last_p = &nmd.ctr.ctr1.array[last_idx];
732
733                 if (last_idx > rdn_idx) {
734                         memmove(rdn_p, rdn_p+1, (last_idx - rdn_idx)*sizeof(rdn));
735                         *last_p = rdn;
736                 }
737         }
738
739         /*
740          * sort the meta data entries by attid, but skip the last one containing
741          * the rdn attribute
742          */
743         qsort(nmd.ctr.ctr1.array, nmd.ctr.ctr1.count - 1,
744               sizeof(struct replPropertyMetaData1),
745               (comparison_fn_t)replmd_replPropertyMetaData1_attid_compare);
746
747         /* create the meta data value */
748         nt_status = ndr_push_struct_blob(&nmd_value, msg, &nmd,
749                                          (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
750         if (!NT_STATUS_IS_OK(nt_status)) {
751                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
752         }
753
754         /*
755          * check if some replicated attributes left, otherwise skip the ldb_modify() call
756          */
757         if (msg->num_elements == 0) {
758                 ldb_debug(ar->module->ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
759                           ar->index_current);
760                 goto next_object;
761         }
762
763         ldb_debug(ar->module->ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
764                   ar->index_current, msg->num_elements);
765
766         /*
767          * when we now that we'll modify the record, add the whenChanged, uSNChanged
768          * and replPopertyMetaData attributes
769          */
770         ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
771         if (ret != LDB_SUCCESS) {
772                 return replmd_replicated_request_error(ar, ret);
773         }
774         ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
775         if (ret != LDB_SUCCESS) {
776                 return replmd_replicated_request_error(ar, ret);
777         }
778         ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
779         if (ret != LDB_SUCCESS) {
780                 return replmd_replicated_request_error(ar, ret);
781         }
782
783         /* we want to replace the old values */
784         for (i=0; i < msg->num_elements; i++) {
785                 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
786         }
787
788         ret = ldb_build_mod_req(&ar->sub.change_req,
789                                 ar->module->ldb,
790                                 ar->sub.mem_ctx,
791                                 msg,
792                                 NULL,
793                                 ar,
794                                 replmd_replicated_apply_merge_callback);
795         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
796
797 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
798         return ldb_next_request(ar->module, ar->sub.change_req);
799 #else
800         ret = ldb_next_request(ar->module, ar->sub.change_req);
801         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
802
803         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
804         if (ar->sub.change_ret != LDB_SUCCESS) {
805                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
806         }
807
808 next_object:
809         talloc_free(ar->sub.mem_ctx);
810         ZERO_STRUCT(ar->sub);
811
812         ar->index_current++;
813
814         return LDB_SUCCESS;
815 #endif
816 }
817
818 static int replmd_replicated_apply_search_callback(struct ldb_context *ldb,
819                                                    void *private_data,
820                                                    struct ldb_reply *ares)
821 {
822         struct replmd_replicated_request *ar = talloc_get_type(private_data,
823                                                struct replmd_replicated_request);
824         bool is_done = false;
825
826         switch (ares->type) {
827         case LDB_REPLY_ENTRY:
828                 ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
829                 break;
830         case LDB_REPLY_REFERRAL:
831                 /* we ignore referrals */
832                 break;
833         case LDB_REPLY_EXTENDED:
834         case LDB_REPLY_DONE:
835                 is_done = true;
836         }
837
838         talloc_free(ares);
839
840 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
841         if (is_done) {
842                 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
843                 if (ar->sub.search_ret != LDB_SUCCESS) {
844                         return replmd_replicated_request_error(ar, ar->sub.search_ret);
845                 }
846                 if (ar->sub.search_msg) {
847                         return replmd_replicated_apply_merge(ar);
848                 }
849                 return replmd_replicated_apply_add(ar);
850         }
851 #endif
852         return LDB_SUCCESS;
853 }
854
855 static int replmd_replicated_apply_search(struct replmd_replicated_request *ar)
856 {
857         int ret;
858         char *tmp_str;
859         char *filter;
860
861         tmp_str = ldb_binary_encode(ar->sub.mem_ctx, ar->objs->objects[ar->index_current].guid_value);
862         if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
863
864         filter = talloc_asprintf(ar->sub.mem_ctx, "(objectGUID=%s)", tmp_str);
865         if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
866         talloc_free(tmp_str);
867
868         ret = ldb_build_search_req(&ar->sub.search_req,
869                                    ar->module->ldb,
870                                    ar->sub.mem_ctx,
871                                    ar->objs->partition_dn,
872                                    LDB_SCOPE_SUBTREE,
873                                    filter,
874                                    NULL,
875                                    NULL,
876                                    ar,
877                                    replmd_replicated_apply_search_callback);
878         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
879
880 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
881         return ldb_next_request(ar->module, ar->sub.search_req);
882 #else
883         ret = ldb_next_request(ar->module, ar->sub.search_req);
884         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
885
886         ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
887         if (ar->sub.search_ret != LDB_SUCCESS) {
888                 return replmd_replicated_request_error(ar, ar->sub.search_ret);
889         }
890         if (ar->sub.search_msg) {
891                 return replmd_replicated_apply_merge(ar);
892         }
893
894         return replmd_replicated_apply_add(ar);
895 #endif
896 }
897
898 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
899 {
900 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
901         if (ar->index_current >= ar->objs->num_objects) {
902                 return replmd_replicated_uptodate_vector(ar);
903         }
904 #endif
905
906         ar->sub.mem_ctx = talloc_new(ar);
907         if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
908
909         return replmd_replicated_apply_search(ar);
910 }
911
912 static int replmd_replicated_uptodate_modify_callback(struct ldb_context *ldb,
913                                                       void *private_data,
914                                                       struct ldb_reply *ares)
915 {
916 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
917         struct replmd_replicated_request *ar = talloc_get_type(private_data,
918                                                struct replmd_replicated_request);
919
920         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
921         if (ar->sub.change_ret != LDB_SUCCESS) {
922                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
923         }
924
925         talloc_free(ar->sub.mem_ctx);
926         ZERO_STRUCT(ar->sub);
927
928         return replmd_replicated_request_done(ar);
929 #else
930         return LDB_SUCCESS;
931 #endif
932 }
933
934 static int replmd_drsuapi_DsReplicaCursor2_compare(const struct drsuapi_DsReplicaCursor2 *c1,
935                                                    const struct drsuapi_DsReplicaCursor2 *c2)
936 {
937         return GUID_compare(&c1->source_dsa_invocation_id, &c2->source_dsa_invocation_id);
938 }
939
940 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
941 {
942         NTSTATUS nt_status;
943         struct ldb_message *msg;
944         struct replUpToDateVectorBlob ouv;
945         const struct ldb_val *ouv_value;
946         const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
947         struct replUpToDateVectorBlob nuv;
948         struct ldb_val nuv_value;
949         struct ldb_message_element *nuv_el = NULL;
950         const struct GUID *our_invocation_id;
951         struct ldb_message_element *orf_el = NULL;
952         struct repsFromToBlob nrf;
953         struct ldb_val *nrf_value = NULL;
954         struct ldb_message_element *nrf_el = NULL;
955         uint32_t i,j,ni=0;
956         uint64_t seq_num;
957         bool found = false;
958         time_t t = time(NULL);
959         NTTIME now;
960         int ret;
961
962         ruv = ar->objs->uptodateness_vector;
963         ZERO_STRUCT(ouv);
964         ouv.version = 2;
965         ZERO_STRUCT(nuv);
966         nuv.version = 2;
967
968         unix_to_nt_time(&now, t);
969
970         /* 
971          * we use the next sequence number for our own highest_usn
972          * because we will do a modify request and this will increment
973          * our highest_usn
974          */
975         ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
976         if (ret != LDB_SUCCESS) {
977                 return replmd_replicated_request_error(ar, ret);
978         }
979
980         /*
981          * first create the new replUpToDateVector
982          */
983         ouv_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replUpToDateVector");
984         if (ouv_value) {
985                 nt_status = ndr_pull_struct_blob(ouv_value, ar->sub.mem_ctx, &ouv,
986                                                  (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
987                 if (!NT_STATUS_IS_OK(nt_status)) {
988                         return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
989                 }
990
991                 if (ouv.version != 2) {
992                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
993                 }
994         }
995
996         /*
997          * the new uptodateness vector will at least
998          * contain 2 entries, one for the source_dsa and one the local server
999          *
1000          * plus optional values from our old vector and the one from the source_dsa
1001          */
1002         nuv.ctr.ctr2.count = 2 + ouv.ctr.ctr2.count;
1003         if (ruv) nuv.ctr.ctr2.count += ruv->count;
1004         nuv.ctr.ctr2.cursors = talloc_array(ar->sub.mem_ctx,
1005                                             struct drsuapi_DsReplicaCursor2,
1006                                             nuv.ctr.ctr2.count);
1007         if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1008
1009         /* first copy the old vector */
1010         for (i=0; i < ouv.ctr.ctr2.count; i++) {
1011                 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1012                 ni++;
1013         }
1014
1015         /* merge in the source_dsa vector is available */
1016         for (i=0; (ruv && i < ruv->count); i++) {
1017                 found = false;
1018
1019                 for (j=0; j < ni; j++) {
1020                         if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1021                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1022                                 continue;
1023                         }
1024
1025                         found = true;
1026
1027                         /*
1028                          * we update only the highest_usn and not the latest_sync_success time,
1029                          * because the last success stands for direct replication
1030                          */
1031                         if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1032                                 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1033                         }
1034                         break;                  
1035                 }
1036
1037                 if (found) continue;
1038
1039                 /* if it's not there yet, add it */
1040                 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1041                 ni++;
1042         }
1043
1044         /*
1045          * merge in the current highwatermark for the source_dsa
1046          */
1047         found = false;
1048         for (j=0; j < ni; j++) {
1049                 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1050                                 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1051                         continue;
1052                 }
1053
1054                 found = true;
1055
1056                 /*
1057                  * here we update the highest_usn and last_sync_success time
1058                  * because we're directly replicating from the source_dsa
1059                  *
1060                  * and use the tmp_highest_usn because this is what we have just applied
1061                  * to our ldb
1062                  */
1063                 nuv.ctr.ctr2.cursors[j].highest_usn             = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1064                 nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1065                 break;
1066         }
1067         if (!found) {
1068                 /*
1069                  * here we update the highest_usn and last_sync_success time
1070                  * because we're directly replicating from the source_dsa
1071                  *
1072                  * and use the tmp_highest_usn because this is what we have just applied
1073                  * to our ldb
1074                  */
1075                 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1076                 nuv.ctr.ctr2.cursors[ni].highest_usn            = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1077                 nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1078                 ni++;
1079         }
1080
1081         /*
1082          * merge our own current values if we have a invocation_id already
1083          * attached to the ldb
1084          */
1085         our_invocation_id = samdb_ntds_invocation_id(ar->module->ldb);
1086         if (our_invocation_id) {
1087                 found = false;
1088                 for (j=0; j < ni; j++) {
1089                         if (!GUID_equal(our_invocation_id,
1090                                         &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1091                                 continue;
1092                         }
1093
1094                         found = true;
1095
1096                         /*
1097                          * here we update the highest_usn and last_sync_success time
1098                          * because it's our own entry
1099                          */
1100                         nuv.ctr.ctr2.cursors[j].highest_usn             = seq_num;
1101                         nuv.ctr.ctr2.cursors[j].last_sync_success       = now;
1102                         break;
1103                 }
1104                 if (!found) {
1105                         /*
1106                          * here we update the highest_usn and last_sync_success time
1107                          * because it's our own entry
1108                          */
1109                         nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= *our_invocation_id;
1110                         nuv.ctr.ctr2.cursors[ni].highest_usn            = seq_num;
1111                         nuv.ctr.ctr2.cursors[ni].last_sync_success      = now;
1112                         ni++;
1113                 }
1114         }
1115
1116         /*
1117          * finally correct the size of the cursors array
1118          */
1119         nuv.ctr.ctr2.count = ni;
1120
1121         /*
1122          * sort the cursors
1123          */
1124         qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1125               sizeof(struct drsuapi_DsReplicaCursor2),
1126               (comparison_fn_t)replmd_drsuapi_DsReplicaCursor2_compare);
1127
1128         /*
1129          * create the change ldb_message
1130          */
1131         msg = ldb_msg_new(ar->sub.mem_ctx);
1132         if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1133         msg->dn = ar->sub.search_msg->dn;
1134
1135         nt_status = ndr_push_struct_blob(&nuv_value, msg, &nuv,
1136                                          (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1137         if (!NT_STATUS_IS_OK(nt_status)) {
1138                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1139         }
1140         ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1141         if (ret != LDB_SUCCESS) {
1142                 return replmd_replicated_request_error(ar, ret);
1143         }
1144         nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1145
1146         /*
1147          * now create the new repsFrom value from the given repsFromTo1 structure
1148          */
1149         ZERO_STRUCT(nrf);
1150         nrf.version                                     = 1;
1151         nrf.ctr.ctr1                                    = *ar->objs->source_dsa;
1152         /* and fix some values... */
1153         nrf.ctr.ctr1.consecutive_sync_failures          = 0;
1154         nrf.ctr.ctr1.last_success                       = now;
1155         nrf.ctr.ctr1.last_attempt                       = now;
1156         nrf.ctr.ctr1.result_last_attempt                = WERR_OK;
1157         nrf.ctr.ctr1.highwatermark.highest_usn          = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1158
1159         /*
1160          * first see if we already have a repsFrom value for the current source dsa
1161          * if so we'll later replace this value
1162          */
1163         orf_el = ldb_msg_find_element(ar->sub.search_msg, "repsFrom");
1164         if (orf_el) {
1165                 for (i=0; i < orf_el->num_values; i++) {
1166                         struct repsFromToBlob *trf;
1167
1168                         trf = talloc(ar->sub.mem_ctx, struct repsFromToBlob);
1169                         if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1170
1171                         nt_status = ndr_pull_struct_blob(&orf_el->values[i], trf, trf,
1172                                                          (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1173                         if (!NT_STATUS_IS_OK(nt_status)) {
1174                                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1175                         }
1176
1177                         if (trf->version != 1) {
1178                                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1179                         }
1180
1181                         /*
1182                          * we compare the source dsa objectGUID not the invocation_id
1183                          * because we want only one repsFrom value per source dsa
1184                          * and when the invocation_id of the source dsa has changed we don't need 
1185                          * the old repsFrom with the old invocation_id
1186                          */
1187                         if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1188                                         &ar->objs->source_dsa->source_dsa_obj_guid)) {
1189                                 talloc_free(trf);
1190                                 continue;
1191                         }
1192
1193                         talloc_free(trf);
1194                         nrf_value = &orf_el->values[i];
1195                         break;
1196                 }
1197
1198                 /*
1199                  * copy over all old values to the new ldb_message
1200                  */
1201                 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1202                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1203                 *nrf_el = *orf_el;
1204         }
1205
1206         /*
1207          * if we haven't found an old repsFrom value for the current source dsa
1208          * we'll add a new value
1209          */
1210         if (!nrf_value) {
1211                 struct ldb_val zero_value;
1212                 ZERO_STRUCT(zero_value);
1213                 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1214                 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1215
1216                 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1217         }
1218
1219         /* we now fill the value which is already attached to ldb_message */
1220         nt_status = ndr_push_struct_blob(nrf_value, msg, &nrf,
1221                                          (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1222         if (!NT_STATUS_IS_OK(nt_status)) {
1223                 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1224         }
1225
1226         /* 
1227          * the ldb_message_element for the attribute, has all the old values and the new one
1228          * so we'll replace the whole attribute with all values
1229          */
1230         nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1231
1232         /* prepare the ldb_modify() request */
1233         ret = ldb_build_mod_req(&ar->sub.change_req,
1234                                 ar->module->ldb,
1235                                 ar->sub.mem_ctx,
1236                                 msg,
1237                                 NULL,
1238                                 ar,
1239                                 replmd_replicated_uptodate_modify_callback);
1240         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1241
1242 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
1243         return ldb_next_request(ar->module, ar->sub.change_req);
1244 #else
1245         ret = ldb_next_request(ar->module, ar->sub.change_req);
1246         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1247
1248         ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1249         if (ar->sub.change_ret != LDB_SUCCESS) {
1250                 return replmd_replicated_request_error(ar, ar->sub.change_ret);
1251         }
1252
1253         talloc_free(ar->sub.mem_ctx);
1254         ZERO_STRUCT(ar->sub);
1255
1256         return replmd_replicated_request_done(ar);
1257 #endif
1258 }
1259
1260 static int replmd_replicated_uptodate_search_callback(struct ldb_context *ldb,
1261                                                       void *private_data,
1262                                                       struct ldb_reply *ares)
1263 {
1264         struct replmd_replicated_request *ar = talloc_get_type(private_data,
1265                                                struct replmd_replicated_request);
1266         bool is_done = false;
1267
1268         switch (ares->type) {
1269         case LDB_REPLY_ENTRY:
1270                 ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
1271                 break;
1272         case LDB_REPLY_REFERRAL:
1273                 /* we ignore referrals */
1274                 break;
1275         case LDB_REPLY_EXTENDED:
1276         case LDB_REPLY_DONE:
1277                 is_done = true;
1278         }
1279
1280         talloc_free(ares);
1281
1282 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
1283         if (is_done) {
1284                 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1285                 if (ar->sub.search_ret != LDB_SUCCESS) {
1286                         return replmd_replicated_request_error(ar, ar->sub.search_ret);
1287                 }
1288                 if (!ar->sub.search_msg) {
1289                         return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1290                 }
1291
1292                 return replmd_replicated_uptodate_modify(ar);
1293         }
1294 #endif
1295         return LDB_SUCCESS;
1296 }
1297
1298 static int replmd_replicated_uptodate_search(struct replmd_replicated_request *ar)
1299 {
1300         int ret;
1301         static const char *attrs[] = {
1302                 "replUpToDateVector",
1303                 "repsFrom",
1304                 NULL
1305         };
1306
1307         ret = ldb_build_search_req(&ar->sub.search_req,
1308                                    ar->module->ldb,
1309                                    ar->sub.mem_ctx,
1310                                    ar->objs->partition_dn,
1311                                    LDB_SCOPE_BASE,
1312                                    "(objectClass=*)",
1313                                    attrs,
1314                                    NULL,
1315                                    ar,
1316                                    replmd_replicated_uptodate_search_callback);
1317         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1318
1319 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
1320         return ldb_next_request(ar->module, ar->sub.search_req);
1321 #else
1322         ret = ldb_next_request(ar->module, ar->sub.search_req);
1323         if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1324
1325         ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1326         if (ar->sub.search_ret != LDB_SUCCESS) {
1327                 return replmd_replicated_request_error(ar, ar->sub.search_ret);
1328         }
1329         if (!ar->sub.search_msg) {
1330                 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1331         }
1332
1333         return replmd_replicated_uptodate_modify(ar);
1334 #endif
1335 }
1336
1337 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1338 {
1339         ar->sub.mem_ctx = talloc_new(ar);
1340         if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1341
1342         return replmd_replicated_uptodate_search(ar);
1343 }
1344
1345 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1346 {
1347         struct dsdb_extended_replicated_objects *objs;
1348         struct replmd_replicated_request *ar;
1349
1350         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1351
1352         objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1353         if (!objs) {
1354                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1355                 return LDB_ERR_PROTOCOL_ERROR;
1356         }
1357
1358         if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1359                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1360                           objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1361                 return LDB_ERR_PROTOCOL_ERROR;
1362         }
1363
1364         ar = replmd_replicated_init_handle(module, req, objs);
1365         if (!ar) {
1366                 return LDB_ERR_OPERATIONS_ERROR;
1367         }
1368
1369 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
1370         return replmd_replicated_apply_next(ar);
1371 #else
1372         while (ar->index_current < ar->objs->num_objects &&
1373                req->handle->state != LDB_ASYNC_DONE) { 
1374                 replmd_replicated_apply_next(ar);
1375         }
1376
1377         if (req->handle->state != LDB_ASYNC_DONE) {
1378                 replmd_replicated_uptodate_vector(ar);
1379         }
1380
1381         return LDB_SUCCESS;
1382 #endif
1383 }
1384
1385 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
1386 {
1387         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
1388                 return replmd_extended_replicated_objects(module, req);
1389         }
1390
1391         return ldb_next_request(module, req);
1392 }
1393
1394 static int replmd_wait_none(struct ldb_handle *handle) {
1395         struct replmd_replicated_request *ar;
1396     
1397         if (!handle || !handle->private_data) {
1398                 return LDB_ERR_OPERATIONS_ERROR;
1399         }
1400
1401         ar = talloc_get_type(handle->private_data, struct replmd_replicated_request);
1402         if (!ar) {
1403                 return LDB_ERR_OPERATIONS_ERROR;
1404         }
1405
1406         /* we do only sync calls */
1407         if (handle->state != LDB_ASYNC_DONE) {
1408                 return LDB_ERR_OPERATIONS_ERROR;
1409         }
1410
1411         return handle->status;
1412 }
1413
1414 static int replmd_wait_all(struct ldb_handle *handle) {
1415
1416         int ret;
1417
1418         while (handle->state != LDB_ASYNC_DONE) {
1419                 ret = replmd_wait_none(handle);
1420                 if (ret != LDB_SUCCESS) {
1421                         return ret;
1422                 }
1423         }
1424
1425         return handle->status;
1426 }
1427
1428 static int replmd_wait(struct ldb_handle *handle, enum ldb_wait_type type)
1429 {
1430         if (type == LDB_WAIT_ALL) {
1431                 return replmd_wait_all(handle);
1432         } else {
1433                 return replmd_wait_none(handle);
1434         }
1435 }
1436
1437 static const struct ldb_module_ops replmd_ops = {
1438         .name          = "repl_meta_data",
1439         .add           = replmd_add,
1440         .modify        = replmd_modify,
1441         .extended      = replmd_extended,
1442         .wait          = replmd_wait
1443 };
1444
1445 int repl_meta_data_module_init(void)
1446 {
1447         return ldb_register_module(&replmd_ops);
1448 }