4 Copyright (C) Simo Sorce 2004-2006
5 Copyright (C) Andrew Bartlett <abartlet@samba.org> 2005
6 Copyright (C) Andrew Tridgell 2005
7 Copyright (C) Stefan Metzmacher 2007
9 ** NOTE! The following LGPL license applies to the ldb
10 ** library. This does NOT imply that all of Samba is released
13 This library is free software; you can redistribute it and/or
14 modify it under the terms of the GNU Lesser General Public
15 License as published by the Free Software Foundation; either
16 version 2 of the License, or (at your option) any later version.
18 This library is distributed in the hope that it will be useful,
19 but WITHOUT ANY WARRANTY; without even the implied warranty of
20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21 Lesser General Public License for more details.
23 You should have received a copy of the GNU Lesser General Public
24 License along with this library; if not, write to the Free Software
25 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
31 * Component: ldb repl_meta_data module
33 * Description: - add a unique objectGUID onto every new record,
34 * - handle whenCreated, whenChanged timestamps
35 * - handle uSNCreated, uSNChanged numbers
36 * - handle replPropertyMetaData attribute
39 * Author: Stefan Metzmacher
43 #include "lib/ldb/include/ldb.h"
44 #include "lib/ldb/include/ldb_errors.h"
45 #include "lib/ldb/include/ldb_private.h"
46 #include "dsdb/samdb/samdb.h"
47 #include "librpc/gen_ndr/ndr_misc.h"
48 #include "librpc/gen_ndr/ndr_drsuapi.h"
49 #include "librpc/gen_ndr/ndr_drsblobs.h"
51 struct replmd_replicated_request {
52 struct ldb_module *module;
53 struct ldb_handle *handle;
54 struct ldb_request *orig_req;
56 struct dsdb_extended_replicated_objects *objs;
58 uint32_t index_current;
62 struct ldb_request *search_req;
63 struct ldb_message *search_msg;
65 struct ldb_request *change_req;
70 static struct replmd_replicated_request *replmd_replicated_init_handle(struct ldb_module *module,
71 struct ldb_request *req,
72 struct dsdb_extended_replicated_objects *objs)
74 struct replmd_replicated_request *ar;
77 h = talloc_zero(req, struct ldb_handle);
79 ldb_set_errstring(module->ldb, "Out of Memory");
84 h->state = LDB_ASYNC_PENDING;
85 h->status = LDB_SUCCESS;
87 ar = talloc_zero(h, struct replmd_replicated_request);
89 ldb_set_errstring(module->ldb, "Out of Memory");
106 static struct ldb_message_element *replmd_find_attribute(const struct ldb_message *msg, const char *name)
110 for (i = 0; i < msg->num_elements; i++) {
111 if (ldb_attr_cmp(name, msg->elements[i].name) == 0) {
112 return &msg->elements[i];
120 add a time element to a record
122 static int add_time_element(struct ldb_message *msg, const char *attr, time_t t)
124 struct ldb_message_element *el;
127 if (ldb_msg_find_element(msg, attr) != NULL) {
131 s = ldb_timestring(msg, t);
136 if (ldb_msg_add_string(msg, attr, s) != 0) {
140 el = ldb_msg_find_element(msg, attr);
141 /* always set as replace. This works because on add ops, the flag
143 el->flags = LDB_FLAG_MOD_REPLACE;
149 add a uint64_t element to a record
151 static int add_uint64_element(struct ldb_message *msg, const char *attr, uint64_t v)
153 struct ldb_message_element *el;
155 if (ldb_msg_find_element(msg, attr) != NULL) {
159 if (ldb_msg_add_fmt(msg, attr, "%llu", (unsigned long long)v) != 0) {
163 el = ldb_msg_find_element(msg, attr);
164 /* always set as replace. This works because on add ops, the flag
166 el->flags = LDB_FLAG_MOD_REPLACE;
171 static int replmd_add_originating(struct ldb_module *module,
172 struct ldb_request *req,
173 const struct dsdb_schema *schema,
174 const struct dsdb_control_current_partition *partition)
176 struct ldb_request *down_req;
177 struct ldb_message_element *attribute;
178 struct ldb_message *msg;
184 time_t t = time(NULL);
186 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_add_originating\n");
188 if ((attribute = replmd_find_attribute(req->op.add.message, "objectGUID")) != NULL ) {
189 return ldb_next_request(module, req);
192 down_req = talloc(req, struct ldb_request);
193 if (down_req == NULL) {
194 return LDB_ERR_OPERATIONS_ERROR;
199 /* we have to copy the message as the caller might have it as a const */
200 down_req->op.add.message = msg = ldb_msg_copy_shallow(down_req, req->op.add.message);
202 talloc_free(down_req);
203 return LDB_ERR_OPERATIONS_ERROR;
207 guid = GUID_random();
209 nt_status = ndr_push_struct_blob(&v, msg, &guid,
210 (ndr_push_flags_fn_t)ndr_push_GUID);
211 if (!NT_STATUS_IS_OK(nt_status)) {
212 talloc_free(down_req);
213 return LDB_ERR_OPERATIONS_ERROR;
216 ret = ldb_msg_add_value(msg, "objectGUID", &v, NULL);
218 talloc_free(down_req);
222 if (add_time_element(msg, "whenCreated", t) != 0 ||
223 add_time_element(msg, "whenChanged", t) != 0) {
224 talloc_free(down_req);
225 return LDB_ERR_OPERATIONS_ERROR;
228 /* Get a sequence number from the backend */
229 ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
230 if (ret == LDB_SUCCESS) {
231 if (add_uint64_element(msg, "uSNCreated", seq_num) != 0 ||
232 add_uint64_element(msg, "uSNChanged", seq_num) != 0) {
233 talloc_free(down_req);
234 return LDB_ERR_OPERATIONS_ERROR;
238 ldb_set_timeout_from_prev_req(module->ldb, req, down_req);
240 /* go on with the call chain */
241 ret = ldb_next_request(module, down_req);
243 /* do not free down_req as the call results may be linked to it,
244 * it will be freed when the upper level request get freed */
245 if (ret == LDB_SUCCESS) {
246 req->handle = down_req->handle;
252 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
254 const struct dsdb_schema *schema;
255 const struct ldb_control *partition_ctrl;
256 const struct dsdb_control_current_partition *partition;
258 /* do not manipulate our control entries */
259 if (ldb_dn_is_special(req->op.add.message->dn)) {
260 return ldb_next_request(module, req);
263 schema = dsdb_get_schema(module->ldb);
265 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
266 "replmd_add: no dsdb_schema loaded");
267 return LDB_ERR_CONSTRAINT_VIOLATION;
270 partition_ctrl = get_control_from_list(req->controls, DSDB_CONTROL_CURRENT_PARTITION_OID);
271 if (!partition_ctrl) {
272 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
273 "replmd_add: no current partition control found");
274 return LDB_ERR_CONSTRAINT_VIOLATION;
277 partition = talloc_get_type(partition_ctrl->data,
278 struct dsdb_control_current_partition);
280 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
281 "replmd_add: current partition control contains invalid data");
282 return LDB_ERR_CONSTRAINT_VIOLATION;
285 if (partition->version != DSDB_CONTROL_CURRENT_PARTITION_VERSION) {
286 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
287 "replmd_add: current partition control contains invalid version [%u != %u]\n",
288 partition->version, DSDB_CONTROL_CURRENT_PARTITION_VERSION);
289 return LDB_ERR_CONSTRAINT_VIOLATION;
292 return replmd_add_originating(module, req, schema, partition);
295 static int replmd_modify_originating(struct ldb_module *module,
296 struct ldb_request *req,
297 const struct dsdb_schema *schema,
298 const struct dsdb_control_current_partition *partition)
300 struct ldb_request *down_req;
301 struct ldb_message *msg;
303 time_t t = time(NULL);
306 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_modify_originating\n");
308 down_req = talloc(req, struct ldb_request);
309 if (down_req == NULL) {
310 return LDB_ERR_OPERATIONS_ERROR;
315 /* we have to copy the message as the caller might have it as a const */
316 down_req->op.mod.message = msg = ldb_msg_copy_shallow(down_req, req->op.mod.message);
318 talloc_free(down_req);
319 return LDB_ERR_OPERATIONS_ERROR;
322 if (add_time_element(msg, "whenChanged", t) != 0) {
323 talloc_free(down_req);
324 return LDB_ERR_OPERATIONS_ERROR;
327 /* Get a sequence number from the backend */
328 ret = ldb_sequence_number(module->ldb, LDB_SEQ_NEXT, &seq_num);
329 if (ret == LDB_SUCCESS) {
330 if (add_uint64_element(msg, "uSNChanged", seq_num) != 0) {
331 talloc_free(down_req);
332 return LDB_ERR_OPERATIONS_ERROR;
336 ldb_set_timeout_from_prev_req(module->ldb, req, down_req);
338 /* go on with the call chain */
339 ret = ldb_next_request(module, down_req);
341 /* do not free down_req as the call results may be linked to it,
342 * it will be freed when the upper level request get freed */
343 if (ret == LDB_SUCCESS) {
344 req->handle = down_req->handle;
350 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
352 const struct dsdb_schema *schema;
353 const struct ldb_control *partition_ctrl;
354 const struct dsdb_control_current_partition *partition;
356 /* do not manipulate our control entries */
357 if (ldb_dn_is_special(req->op.mod.message->dn)) {
358 return ldb_next_request(module, req);
361 schema = dsdb_get_schema(module->ldb);
363 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
364 "replmd_modify: no dsdb_schema loaded");
365 return LDB_ERR_CONSTRAINT_VIOLATION;
368 schema = dsdb_get_schema(module->ldb);
370 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
371 "replmd_modify: no dsdb_schema loaded");
372 return LDB_ERR_CONSTRAINT_VIOLATION;
375 partition_ctrl = get_control_from_list(req->controls, DSDB_CONTROL_CURRENT_PARTITION_OID);
376 if (!partition_ctrl) {
377 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
378 "replmd_modify: no current partition control found");
379 return LDB_ERR_CONSTRAINT_VIOLATION;
382 partition = talloc_get_type(partition_ctrl->data,
383 struct dsdb_control_current_partition);
385 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
386 "replmd_modify: current partition control contains invalid data");
387 return LDB_ERR_CONSTRAINT_VIOLATION;
390 if (partition->version != DSDB_CONTROL_CURRENT_PARTITION_VERSION) {
391 ldb_debug_set(module->ldb, LDB_DEBUG_FATAL,
392 "replmd_modify: current partition control contains invalid version [%u != %u]\n",
393 partition->version, DSDB_CONTROL_CURRENT_PARTITION_VERSION);
394 return LDB_ERR_CONSTRAINT_VIOLATION;
397 return replmd_modify_originating(module, req, schema, partition);
400 static int replmd_replicated_request_reply_helper(struct replmd_replicated_request *ar, int ret)
402 struct ldb_reply *ares = NULL;
404 ar->handle->status = ret;
405 ar->handle->state = LDB_ASYNC_DONE;
407 if (!ar->orig_req->callback) {
411 /* we're done and need to report the success to the caller */
412 ares = talloc_zero(ar, struct ldb_reply);
414 ar->handle->status = LDB_ERR_OPERATIONS_ERROR;
415 ar->handle->state = LDB_ASYNC_DONE;
416 return LDB_ERR_OPERATIONS_ERROR;
419 ares->type = LDB_REPLY_EXTENDED;
420 ares->response = NULL;
422 return ar->orig_req->callback(ar->module->ldb, ar->orig_req->context, ares);
425 static int replmd_replicated_request_done(struct replmd_replicated_request *ar)
427 return replmd_replicated_request_reply_helper(ar, LDB_SUCCESS);
430 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
432 return replmd_replicated_request_reply_helper(ar, ret);
435 static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
437 int ret = LDB_ERR_OTHER;
438 /* TODO: do some error mapping */
439 return replmd_replicated_request_reply_helper(ar, ret);
442 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
444 static int replmd_replicated_apply_add_callback(struct ldb_context *ldb,
446 struct ldb_reply *ares)
448 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
449 struct replmd_replicated_request *ar = talloc_get_type(private_data,
450 struct replmd_replicated_request);
452 ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
453 if (ar->sub.change_ret != LDB_SUCCESS) {
454 return replmd_replicated_request_error(ar, ar->sub.change_ret);
457 talloc_free(ar->sub.mem_ctx);
458 ZERO_STRUCT(ar->sub);
462 return replmd_replicated_apply_next(ar);
468 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
471 struct ldb_message *msg;
472 struct replPropertyMetaDataBlob *md;
473 struct ldb_val md_value;
479 * TODO: check if the parent object exist
483 * TODO: handle the conflict case where an object with the
487 msg = ar->objs->objects[ar->index_current].msg;
488 md = ar->objs->objects[ar->index_current].meta_data;
490 ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
491 if (ret != LDB_SUCCESS) {
492 return replmd_replicated_request_error(ar, ret);
495 ret = ldb_msg_add_value(msg, "objectGUID", &ar->objs->objects[ar->index_current].guid_value, NULL);
496 if (ret != LDB_SUCCESS) {
497 return replmd_replicated_request_error(ar, ret);
500 ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
501 if (ret != LDB_SUCCESS) {
502 return replmd_replicated_request_error(ar, ret);
505 ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNCreated", seq_num);
506 if (ret != LDB_SUCCESS) {
507 return replmd_replicated_request_error(ar, ret);
510 ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
511 if (ret != LDB_SUCCESS) {
512 return replmd_replicated_request_error(ar, ret);
515 md = ar->objs->objects[ar->index_current].meta_data;
516 for (i=0; i < md->ctr.ctr1.count; i++) {
517 md->ctr.ctr1.array[i].local_usn = seq_num;
519 nt_status = ndr_push_struct_blob(&md_value, msg, md,
520 (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
521 if (!NT_STATUS_IS_OK(nt_status)) {
522 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
524 ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
525 if (ret != LDB_SUCCESS) {
526 return replmd_replicated_request_error(ar, ret);
529 ret = ldb_build_add_req(&ar->sub.change_req,
535 replmd_replicated_apply_add_callback);
536 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
538 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
539 return ldb_next_request(ar->module, ar->sub.change_req);
541 ret = ldb_next_request(ar->module, ar->sub.change_req);
542 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
544 ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
545 if (ar->sub.change_ret != LDB_SUCCESS) {
546 return replmd_replicated_request_error(ar, ar->sub.change_ret);
549 talloc_free(ar->sub.mem_ctx);
550 ZERO_STRUCT(ar->sub);
558 static int replmd_replPropertyMetaData1_attid_compare(struct replPropertyMetaData1 *m1,
559 struct replPropertyMetaData1 *m2)
561 return m1->attid - m2->attid;
564 static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMetaData1 *m1,
565 struct replPropertyMetaData1 *m2)
569 if (m1->version != m2->version) {
570 return m1->version - m2->version;
573 if (m1->orginating_time != m2->orginating_time) {
574 return m1->orginating_time - m2->orginating_time;
577 ret = GUID_compare(&m1->orginating_invocation_id, &m2->orginating_invocation_id);
582 return m1->orginating_usn - m2->orginating_usn;
585 static int replmd_replicated_apply_merge_callback(struct ldb_context *ldb,
587 struct ldb_reply *ares)
589 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
590 struct replmd_replicated_request *ar = talloc_get_type(private_data,
591 struct replmd_replicated_request);
593 ret = ldb_next_request(ar->module, ar->sub.change_req);
594 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
596 ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
597 if (ar->sub.change_ret != LDB_SUCCESS) {
598 return replmd_replicated_request_error(ar, ar->sub.change_ret);
601 talloc_free(ar->sub.mem_ctx);
602 ZERO_STRUCT(ar->sub);
612 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
615 struct ldb_message *msg;
616 struct replPropertyMetaDataBlob *rmd;
617 struct replPropertyMetaDataBlob omd;
618 const struct ldb_val *omd_value;
619 struct replPropertyMetaDataBlob nmd;
620 struct ldb_val nmd_value;
622 uint32_t removed_attrs = 0;
626 msg = ar->objs->objects[ar->index_current].msg;
627 rmd = ar->objs->objects[ar->index_current].meta_data;
632 * TODO: add rename conflict handling
634 if (ldb_dn_compare(msg->dn, ar->sub.search_msg->dn) != 0) {
635 ldb_debug_set(ar->module->ldb, LDB_DEBUG_FATAL, "replmd_replicated_apply_merge[%u]: rename not supported",
637 ldb_debug(ar->module->ldb, LDB_DEBUG_FATAL, "%s => %s\n",
638 ldb_dn_get_linearized(ar->sub.search_msg->dn),
639 ldb_dn_get_linearized(msg->dn));
640 return replmd_replicated_request_werror(ar, WERR_NOT_SUPPORTED);
643 ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
644 if (ret != LDB_SUCCESS) {
645 return replmd_replicated_request_error(ar, ret);
648 /* find existing meta data */
649 omd_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replPropertyMetaData");
651 nt_status = ndr_pull_struct_blob(omd_value, ar->sub.mem_ctx, &omd,
652 (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
653 if (!NT_STATUS_IS_OK(nt_status)) {
654 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
657 if (omd.version != 1) {
658 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
664 nmd.ctr.ctr1.count = omd.ctr.ctr1.count + rmd->ctr.ctr1.count;
665 nmd.ctr.ctr1.array = talloc_array(ar->sub.mem_ctx,
666 struct replPropertyMetaData1,
668 if (!nmd.ctr.ctr1.array) return replmd_replicated_request_werror(ar, WERR_NOMEM);
670 /* first copy the old meta data */
671 for (i=0; i < omd.ctr.ctr1.count; i++) {
672 nmd.ctr.ctr1.array[ni] = omd.ctr.ctr1.array[i];
676 /* now merge in the new meta data */
677 for (i=0; i < rmd->ctr.ctr1.count; i++) {
680 rmd->ctr.ctr1.array[i].local_usn = seq_num;
682 for (j=0; j < ni; j++) {
685 if (rmd->ctr.ctr1.array[i].attid != nmd.ctr.ctr1.array[j].attid) {
689 cmp = replmd_replPropertyMetaData1_conflict_compare(&rmd->ctr.ctr1.array[i],
690 &nmd.ctr.ctr1.array[j]);
692 /* replace the entry */
693 nmd.ctr.ctr1.array[j] = rmd->ctr.ctr1.array[i];
698 /* we don't want to apply this change so remove the attribute */
699 ldb_msg_remove_element(msg, &msg->elements[i-removed_attrs]);
708 nmd.ctr.ctr1.array[ni] = rmd->ctr.ctr1.array[i];
713 * finally correct the size of the meta_data array
715 nmd.ctr.ctr1.count = ni;
718 * the rdn attribute (the alias for the name attribute),
719 * 'cn' for most objects is the last entry in the meta data array
722 * as it should stay the last one in the new list, we move it to the end
725 struct replPropertyMetaData1 *rdn_p, rdn, *last_p;
726 uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
727 uint32_t last_idx = ni - 1;
729 rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
731 last_p = &nmd.ctr.ctr1.array[last_idx];
733 if (last_idx > rdn_idx) {
734 memmove(rdn_p, rdn_p+1, (last_idx - rdn_idx)*sizeof(rdn));
740 * sort the meta data entries by attid, but skip the last one containing
743 qsort(nmd.ctr.ctr1.array, nmd.ctr.ctr1.count - 1,
744 sizeof(struct replPropertyMetaData1),
745 (comparison_fn_t)replmd_replPropertyMetaData1_attid_compare);
747 /* create the meta data value */
748 nt_status = ndr_push_struct_blob(&nmd_value, msg, &nmd,
749 (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
750 if (!NT_STATUS_IS_OK(nt_status)) {
751 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
755 * check if some replicated attributes left, otherwise skip the ldb_modify() call
757 if (msg->num_elements == 0) {
758 ldb_debug(ar->module->ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: skip replace\n",
763 ldb_debug(ar->module->ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
764 ar->index_current, msg->num_elements);
767 * when we now that we'll modify the record, add the whenChanged, uSNChanged
768 * and replPopertyMetaData attributes
770 ret = ldb_msg_add_string(msg, "whenChanged", ar->objs->objects[ar->index_current].when_changed);
771 if (ret != LDB_SUCCESS) {
772 return replmd_replicated_request_error(ar, ret);
774 ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
775 if (ret != LDB_SUCCESS) {
776 return replmd_replicated_request_error(ar, ret);
778 ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
779 if (ret != LDB_SUCCESS) {
780 return replmd_replicated_request_error(ar, ret);
783 /* we want to replace the old values */
784 for (i=0; i < msg->num_elements; i++) {
785 msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
788 ret = ldb_build_mod_req(&ar->sub.change_req,
794 replmd_replicated_apply_merge_callback);
795 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
797 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
798 return ldb_next_request(ar->module, ar->sub.change_req);
800 ret = ldb_next_request(ar->module, ar->sub.change_req);
801 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
803 ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
804 if (ar->sub.change_ret != LDB_SUCCESS) {
805 return replmd_replicated_request_error(ar, ar->sub.change_ret);
809 talloc_free(ar->sub.mem_ctx);
810 ZERO_STRUCT(ar->sub);
818 static int replmd_replicated_apply_search_callback(struct ldb_context *ldb,
820 struct ldb_reply *ares)
822 struct replmd_replicated_request *ar = talloc_get_type(private_data,
823 struct replmd_replicated_request);
824 bool is_done = false;
826 switch (ares->type) {
827 case LDB_REPLY_ENTRY:
828 ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
830 case LDB_REPLY_REFERRAL:
831 /* we ignore referrals */
833 case LDB_REPLY_EXTENDED:
840 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
842 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
843 if (ar->sub.search_ret != LDB_SUCCESS) {
844 return replmd_replicated_request_error(ar, ar->sub.search_ret);
846 if (ar->sub.search_msg) {
847 return replmd_replicated_apply_merge(ar);
849 return replmd_replicated_apply_add(ar);
855 static int replmd_replicated_apply_search(struct replmd_replicated_request *ar)
861 tmp_str = ldb_binary_encode(ar->sub.mem_ctx, ar->objs->objects[ar->index_current].guid_value);
862 if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
864 filter = talloc_asprintf(ar->sub.mem_ctx, "(objectGUID=%s)", tmp_str);
865 if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
866 talloc_free(tmp_str);
868 ret = ldb_build_search_req(&ar->sub.search_req,
871 ar->objs->partition_dn,
877 replmd_replicated_apply_search_callback);
878 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
880 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
881 return ldb_next_request(ar->module, ar->sub.search_req);
883 ret = ldb_next_request(ar->module, ar->sub.search_req);
884 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
886 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
887 if (ar->sub.search_ret != LDB_SUCCESS) {
888 return replmd_replicated_request_error(ar, ar->sub.search_ret);
890 if (ar->sub.search_msg) {
891 return replmd_replicated_apply_merge(ar);
894 return replmd_replicated_apply_add(ar);
898 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
900 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
901 if (ar->index_current >= ar->objs->num_objects) {
902 return replmd_replicated_uptodate_vector(ar);
906 ar->sub.mem_ctx = talloc_new(ar);
907 if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
909 return replmd_replicated_apply_search(ar);
912 static int replmd_replicated_uptodate_modify_callback(struct ldb_context *ldb,
914 struct ldb_reply *ares)
916 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
917 struct replmd_replicated_request *ar = talloc_get_type(private_data,
918 struct replmd_replicated_request);
920 ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
921 if (ar->sub.change_ret != LDB_SUCCESS) {
922 return replmd_replicated_request_error(ar, ar->sub.change_ret);
925 talloc_free(ar->sub.mem_ctx);
926 ZERO_STRUCT(ar->sub);
928 return replmd_replicated_request_done(ar);
934 static int replmd_drsuapi_DsReplicaCursor2_compare(const struct drsuapi_DsReplicaCursor2 *c1,
935 const struct drsuapi_DsReplicaCursor2 *c2)
937 return GUID_compare(&c1->source_dsa_invocation_id, &c2->source_dsa_invocation_id);
940 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
943 struct ldb_message *msg;
944 struct replUpToDateVectorBlob ouv;
945 const struct ldb_val *ouv_value;
946 const struct drsuapi_DsReplicaCursor2CtrEx *ruv;
947 struct replUpToDateVectorBlob nuv;
948 struct ldb_val nuv_value;
949 struct ldb_message_element *nuv_el = NULL;
950 const struct GUID *our_invocation_id;
951 struct ldb_message_element *orf_el = NULL;
952 struct repsFromToBlob nrf;
953 struct ldb_val *nrf_value = NULL;
954 struct ldb_message_element *nrf_el = NULL;
958 time_t t = time(NULL);
962 ruv = ar->objs->uptodateness_vector;
968 unix_to_nt_time(&now, t);
971 * we use the next sequence number for our own highest_usn
972 * because we will do a modify request and this will increment
975 ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
976 if (ret != LDB_SUCCESS) {
977 return replmd_replicated_request_error(ar, ret);
981 * first create the new replUpToDateVector
983 ouv_value = ldb_msg_find_ldb_val(ar->sub.search_msg, "replUpToDateVector");
985 nt_status = ndr_pull_struct_blob(ouv_value, ar->sub.mem_ctx, &ouv,
986 (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
987 if (!NT_STATUS_IS_OK(nt_status)) {
988 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
991 if (ouv.version != 2) {
992 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
997 * the new uptodateness vector will at least
998 * contain 2 entries, one for the source_dsa and one the local server
1000 * plus optional values from our old vector and the one from the source_dsa
1002 nuv.ctr.ctr2.count = 2 + ouv.ctr.ctr2.count;
1003 if (ruv) nuv.ctr.ctr2.count += ruv->count;
1004 nuv.ctr.ctr2.cursors = talloc_array(ar->sub.mem_ctx,
1005 struct drsuapi_DsReplicaCursor2,
1006 nuv.ctr.ctr2.count);
1007 if (!nuv.ctr.ctr2.cursors) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1009 /* first copy the old vector */
1010 for (i=0; i < ouv.ctr.ctr2.count; i++) {
1011 nuv.ctr.ctr2.cursors[ni] = ouv.ctr.ctr2.cursors[i];
1015 /* merge in the source_dsa vector is available */
1016 for (i=0; (ruv && i < ruv->count); i++) {
1019 for (j=0; j < ni; j++) {
1020 if (!GUID_equal(&ruv->cursors[i].source_dsa_invocation_id,
1021 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1028 * we update only the highest_usn and not the latest_sync_success time,
1029 * because the last success stands for direct replication
1031 if (ruv->cursors[i].highest_usn > nuv.ctr.ctr2.cursors[j].highest_usn) {
1032 nuv.ctr.ctr2.cursors[j].highest_usn = ruv->cursors[i].highest_usn;
1037 if (found) continue;
1039 /* if it's not there yet, add it */
1040 nuv.ctr.ctr2.cursors[ni] = ruv->cursors[i];
1045 * merge in the current highwatermark for the source_dsa
1048 for (j=0; j < ni; j++) {
1049 if (!GUID_equal(&ar->objs->source_dsa->source_dsa_invocation_id,
1050 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1057 * here we update the highest_usn and last_sync_success time
1058 * because we're directly replicating from the source_dsa
1060 * and use the tmp_highest_usn because this is what we have just applied
1063 nuv.ctr.ctr2.cursors[j].highest_usn = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1064 nuv.ctr.ctr2.cursors[j].last_sync_success = now;
1069 * here we update the highest_usn and last_sync_success time
1070 * because we're directly replicating from the source_dsa
1072 * and use the tmp_highest_usn because this is what we have just applied
1075 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= ar->objs->source_dsa->source_dsa_invocation_id;
1076 nuv.ctr.ctr2.cursors[ni].highest_usn = ar->objs->source_dsa->highwatermark.tmp_highest_usn;
1077 nuv.ctr.ctr2.cursors[ni].last_sync_success = now;
1082 * merge our own current values if we have a invocation_id already
1083 * attached to the ldb
1085 our_invocation_id = samdb_ntds_invocation_id(ar->module->ldb);
1086 if (our_invocation_id) {
1088 for (j=0; j < ni; j++) {
1089 if (!GUID_equal(our_invocation_id,
1090 &nuv.ctr.ctr2.cursors[j].source_dsa_invocation_id)) {
1097 * here we update the highest_usn and last_sync_success time
1098 * because it's our own entry
1100 nuv.ctr.ctr2.cursors[j].highest_usn = seq_num;
1101 nuv.ctr.ctr2.cursors[j].last_sync_success = now;
1106 * here we update the highest_usn and last_sync_success time
1107 * because it's our own entry
1109 nuv.ctr.ctr2.cursors[ni].source_dsa_invocation_id= *our_invocation_id;
1110 nuv.ctr.ctr2.cursors[ni].highest_usn = seq_num;
1111 nuv.ctr.ctr2.cursors[ni].last_sync_success = now;
1117 * finally correct the size of the cursors array
1119 nuv.ctr.ctr2.count = ni;
1124 qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
1125 sizeof(struct drsuapi_DsReplicaCursor2),
1126 (comparison_fn_t)replmd_drsuapi_DsReplicaCursor2_compare);
1129 * create the change ldb_message
1131 msg = ldb_msg_new(ar->sub.mem_ctx);
1132 if (!msg) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1133 msg->dn = ar->sub.search_msg->dn;
1135 nt_status = ndr_push_struct_blob(&nuv_value, msg, &nuv,
1136 (ndr_push_flags_fn_t)ndr_push_replUpToDateVectorBlob);
1137 if (!NT_STATUS_IS_OK(nt_status)) {
1138 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1140 ret = ldb_msg_add_value(msg, "replUpToDateVector", &nuv_value, &nuv_el);
1141 if (ret != LDB_SUCCESS) {
1142 return replmd_replicated_request_error(ar, ret);
1144 nuv_el->flags = LDB_FLAG_MOD_REPLACE;
1147 * now create the new repsFrom value from the given repsFromTo1 structure
1151 nrf.ctr.ctr1 = *ar->objs->source_dsa;
1152 /* and fix some values... */
1153 nrf.ctr.ctr1.consecutive_sync_failures = 0;
1154 nrf.ctr.ctr1.last_success = now;
1155 nrf.ctr.ctr1.last_attempt = now;
1156 nrf.ctr.ctr1.result_last_attempt = WERR_OK;
1157 nrf.ctr.ctr1.highwatermark.highest_usn = nrf.ctr.ctr1.highwatermark.tmp_highest_usn;
1160 * first see if we already have a repsFrom value for the current source dsa
1161 * if so we'll later replace this value
1163 orf_el = ldb_msg_find_element(ar->sub.search_msg, "repsFrom");
1165 for (i=0; i < orf_el->num_values; i++) {
1166 struct repsFromToBlob *trf;
1168 trf = talloc(ar->sub.mem_ctx, struct repsFromToBlob);
1169 if (!trf) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1171 nt_status = ndr_pull_struct_blob(&orf_el->values[i], trf, trf,
1172 (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
1173 if (!NT_STATUS_IS_OK(nt_status)) {
1174 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1177 if (trf->version != 1) {
1178 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1182 * we compare the source dsa objectGUID not the invocation_id
1183 * because we want only one repsFrom value per source dsa
1184 * and when the invocation_id of the source dsa has changed we don't need
1185 * the old repsFrom with the old invocation_id
1187 if (!GUID_equal(&trf->ctr.ctr1.source_dsa_obj_guid,
1188 &ar->objs->source_dsa->source_dsa_obj_guid)) {
1194 nrf_value = &orf_el->values[i];
1199 * copy over all old values to the new ldb_message
1201 ret = ldb_msg_add_empty(msg, "repsFrom", 0, &nrf_el);
1202 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1207 * if we haven't found an old repsFrom value for the current source dsa
1208 * we'll add a new value
1211 struct ldb_val zero_value;
1212 ZERO_STRUCT(zero_value);
1213 ret = ldb_msg_add_value(msg, "repsFrom", &zero_value, &nrf_el);
1214 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1216 nrf_value = &nrf_el->values[nrf_el->num_values - 1];
1219 /* we now fill the value which is already attached to ldb_message */
1220 nt_status = ndr_push_struct_blob(nrf_value, msg, &nrf,
1221 (ndr_push_flags_fn_t)ndr_push_repsFromToBlob);
1222 if (!NT_STATUS_IS_OK(nt_status)) {
1223 return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
1227 * the ldb_message_element for the attribute, has all the old values and the new one
1228 * so we'll replace the whole attribute with all values
1230 nrf_el->flags = LDB_FLAG_MOD_REPLACE;
1232 /* prepare the ldb_modify() request */
1233 ret = ldb_build_mod_req(&ar->sub.change_req,
1239 replmd_replicated_uptodate_modify_callback);
1240 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1242 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
1243 return ldb_next_request(ar->module, ar->sub.change_req);
1245 ret = ldb_next_request(ar->module, ar->sub.change_req);
1246 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1248 ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1249 if (ar->sub.change_ret != LDB_SUCCESS) {
1250 return replmd_replicated_request_error(ar, ar->sub.change_ret);
1253 talloc_free(ar->sub.mem_ctx);
1254 ZERO_STRUCT(ar->sub);
1256 return replmd_replicated_request_done(ar);
1260 static int replmd_replicated_uptodate_search_callback(struct ldb_context *ldb,
1262 struct ldb_reply *ares)
1264 struct replmd_replicated_request *ar = talloc_get_type(private_data,
1265 struct replmd_replicated_request);
1266 bool is_done = false;
1268 switch (ares->type) {
1269 case LDB_REPLY_ENTRY:
1270 ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
1272 case LDB_REPLY_REFERRAL:
1273 /* we ignore referrals */
1275 case LDB_REPLY_EXTENDED:
1276 case LDB_REPLY_DONE:
1282 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
1284 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1285 if (ar->sub.search_ret != LDB_SUCCESS) {
1286 return replmd_replicated_request_error(ar, ar->sub.search_ret);
1288 if (!ar->sub.search_msg) {
1289 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1292 return replmd_replicated_uptodate_modify(ar);
1298 static int replmd_replicated_uptodate_search(struct replmd_replicated_request *ar)
1301 static const char *attrs[] = {
1302 "replUpToDateVector",
1307 ret = ldb_build_search_req(&ar->sub.search_req,
1310 ar->objs->partition_dn,
1316 replmd_replicated_uptodate_search_callback);
1317 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1319 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
1320 return ldb_next_request(ar->module, ar->sub.search_req);
1322 ret = ldb_next_request(ar->module, ar->sub.search_req);
1323 if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
1325 ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
1326 if (ar->sub.search_ret != LDB_SUCCESS) {
1327 return replmd_replicated_request_error(ar, ar->sub.search_ret);
1329 if (!ar->sub.search_msg) {
1330 return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
1333 return replmd_replicated_uptodate_modify(ar);
1337 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar)
1339 ar->sub.mem_ctx = talloc_new(ar);
1340 if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
1342 return replmd_replicated_uptodate_search(ar);
1345 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
1347 struct dsdb_extended_replicated_objects *objs;
1348 struct replmd_replicated_request *ar;
1350 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
1352 objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
1354 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: invalid extended data\n");
1355 return LDB_ERR_PROTOCOL_ERROR;
1358 if (objs->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
1359 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "replmd_extended_replicated_objects: extended data invalid version [%u != %u]\n",
1360 objs->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
1361 return LDB_ERR_PROTOCOL_ERROR;
1364 ar = replmd_replicated_init_handle(module, req, objs);
1366 return LDB_ERR_OPERATIONS_ERROR;
1369 #ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */
1370 return replmd_replicated_apply_next(ar);
1372 while (ar->index_current < ar->objs->num_objects &&
1373 req->handle->state != LDB_ASYNC_DONE) {
1374 replmd_replicated_apply_next(ar);
1377 if (req->handle->state != LDB_ASYNC_DONE) {
1378 replmd_replicated_uptodate_vector(ar);
1385 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
1387 if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
1388 return replmd_extended_replicated_objects(module, req);
1391 return ldb_next_request(module, req);
1394 static int replmd_wait_none(struct ldb_handle *handle) {
1395 struct replmd_replicated_request *ar;
1397 if (!handle || !handle->private_data) {
1398 return LDB_ERR_OPERATIONS_ERROR;
1401 ar = talloc_get_type(handle->private_data, struct replmd_replicated_request);
1403 return LDB_ERR_OPERATIONS_ERROR;
1406 /* we do only sync calls */
1407 if (handle->state != LDB_ASYNC_DONE) {
1408 return LDB_ERR_OPERATIONS_ERROR;
1411 return handle->status;
1414 static int replmd_wait_all(struct ldb_handle *handle) {
1418 while (handle->state != LDB_ASYNC_DONE) {
1419 ret = replmd_wait_none(handle);
1420 if (ret != LDB_SUCCESS) {
1425 return handle->status;
1428 static int replmd_wait(struct ldb_handle *handle, enum ldb_wait_type type)
1430 if (type == LDB_WAIT_ALL) {
1431 return replmd_wait_all(handle);
1433 return replmd_wait_none(handle);
1437 static const struct ldb_module_ops replmd_ops = {
1438 .name = "repl_meta_data",
1440 .modify = replmd_modify,
1441 .extended = replmd_extended,
1445 int repl_meta_data_module_init(void)
1447 return ldb_register_module(&replmd_ops);