r22472: Commit the start of the DRSUAPI pull replication service.
authorStefan Metzmacher <metze@samba.org>
Mon, 23 Apr 2007 00:43:47 +0000 (00:43 +0000)
committerGerald (Jerry) Carter <jerry@samba.org>
Wed, 10 Oct 2007 19:51:30 +0000 (14:51 -0500)
It doesn't work completely yet because we aren't able to
resolve DNS SRV records. And also we also need a kdc locator
plugin...

But with some hacks the pull replication works fine

metze
(This used to be commit 0dc78f7439c9c786fd8c592960f9669dea40b811)

source4/dsdb/config.mk
source4/dsdb/repl/drepl_out_helpers.c [new file with mode: 0644]
source4/dsdb/repl/drepl_out_helpers.h [new file with mode: 0644]
source4/dsdb/repl/drepl_out_pull.c [new file with mode: 0644]
source4/dsdb/repl/drepl_partitions.c [new file with mode: 0644]
source4/dsdb/repl/drepl_periodic.c [new file with mode: 0644]
source4/dsdb/repl/drepl_service.c
source4/dsdb/repl/drepl_service.h

index 1f0bda17b057d00ce6de664176f997e5de2ebd59..9555afd5cb42dfc320374196800e6aaa2c43b262 100644 (file)
@@ -30,7 +30,11 @@ OBJ_FILES = \
 INIT_FUNCTION = server_service_drepl_init
 SUBSYSTEM = service
 OBJ_FILES = \
-               repl/drepl_service.o
+               repl/drepl_service.o \
+               repl/drepl_periodic.o \
+               repl/drepl_partitions.o \
+               repl/drepl_out_pull.o \
+               repl/drepl_out_helpers.o
 PRIVATE_PROTO_HEADER = repl/drepl_service_proto.h
 PRIVATE_DEPENDENCIES = \
                SAMDB \
diff --git a/source4/dsdb/repl/drepl_out_helpers.c b/source4/dsdb/repl/drepl_out_helpers.c
new file mode 100644 (file)
index 0000000..7ff9efe
--- /dev/null
@@ -0,0 +1,420 @@
+/* 
+   Unix SMB/CIFS mplementation.
+   DSDB replication service helper function for outgoing traffic
+   
+   Copyright (C) Stefan Metzmacher 2007
+    
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 2 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+   
+*/
+
+#include "includes.h"
+#include "dsdb/samdb/samdb.h"
+#include "auth/auth.h"
+#include "smbd/service.h"
+#include "lib/events/events.h"
+#include "lib/messaging/irpc.h"
+#include "dsdb/repl/drepl_service.h"
+#include "lib/ldb/include/ldb_errors.h"
+#include "lib/util/dlinklist.h"
+#include "librpc/gen_ndr/ndr_misc.h"
+#include "librpc/gen_ndr/ndr_drsuapi.h"
+#include "librpc/gen_ndr/ndr_drsblobs.h"
+#include "libcli/composite/composite.h"
+#include "auth/gensec/gensec.h"
+
+struct dreplsrv_out_drsuapi_state {
+       struct composite_context *creq;
+
+       struct dreplsrv_out_connection *conn;
+
+       struct dreplsrv_drsuapi_connection *drsuapi;
+
+       struct drsuapi_DsBindInfoCtr bind_info_ctr;
+       struct drsuapi_DsBind bind_r;
+};
+
+static void dreplsrv_out_drsuapi_connect_recv(struct composite_context *creq);
+
+static struct composite_context *dreplsrv_out_drsuapi_send(struct dreplsrv_out_connection *conn)
+{
+       struct composite_context *c;
+       struct composite_context *creq;
+       struct dreplsrv_out_drsuapi_state *st;
+
+       c = composite_create(conn, conn->service->task->event_ctx);
+       if (c == NULL) return NULL;
+
+       st = talloc_zero(c, struct dreplsrv_out_drsuapi_state);
+       if (composite_nomem(st, c)) return c;
+
+       c->private_data = st;
+
+       st->creq        = c;
+       st->conn        = conn;
+       st->drsuapi     = conn->drsuapi;
+
+       if (st->drsuapi && !st->drsuapi->pipe->conn->dead) {
+               composite_done(c);
+               return c;
+       } else if (st->drsuapi && st->drsuapi->pipe->conn->dead) {
+               talloc_free(st->drsuapi);
+               conn->drsuapi = NULL;
+       }
+
+       st->drsuapi     = talloc_zero(st, struct dreplsrv_drsuapi_connection);
+       if (composite_nomem(st->drsuapi, c)) return c;
+
+       creq = dcerpc_pipe_connect_b_send(st, conn->binding, &dcerpc_table_drsuapi,
+                                         conn->service->system_session_info->credentials,
+                                         c->event_ctx);
+       composite_continue(c, creq, dreplsrv_out_drsuapi_connect_recv, st);
+
+       return c;
+}
+
+static void dreplsrv_out_drsuapi_bind_send(struct dreplsrv_out_drsuapi_state *st);
+
+static void dreplsrv_out_drsuapi_connect_recv(struct composite_context *creq)
+{
+       struct dreplsrv_out_drsuapi_state *st = talloc_get_type(creq->async.private_data,
+                                               struct dreplsrv_out_drsuapi_state);
+       struct composite_context *c = st->creq;
+
+       c->status = dcerpc_pipe_connect_b_recv(creq, st->drsuapi, &st->drsuapi->pipe);
+       if (!composite_is_ok(c)) return;
+
+       c->status = gensec_session_key(st->drsuapi->pipe->conn->security_state.generic_state,
+                                      &st->drsuapi->gensec_skey);
+       if (!composite_is_ok(c)) return;
+
+       dreplsrv_out_drsuapi_bind_send(st);
+}
+
+static void dreplsrv_out_drsuapi_bind_recv(struct rpc_request *req);
+
+static void dreplsrv_out_drsuapi_bind_send(struct dreplsrv_out_drsuapi_state *st)
+{
+       struct composite_context *c = st->creq;
+       struct rpc_request *req;
+
+       st->bind_info_ctr.length        = 28;
+       st->bind_info_ctr.info.info28   = st->conn->service->bind_info28;
+
+       st->bind_r.in.bind_guid = &st->conn->service->ntds_guid;
+       st->bind_r.in.bind_info = &st->bind_info_ctr;
+       st->bind_r.out.bind_handle = &st->drsuapi->bind_handle;
+
+       req = dcerpc_drsuapi_DsBind_send(st->drsuapi->pipe, st, &st->bind_r);
+       composite_continue_rpc(c, req, dreplsrv_out_drsuapi_bind_recv, st);
+}
+
+static void dreplsrv_out_drsuapi_bind_recv(struct rpc_request *req)
+{
+       struct dreplsrv_out_drsuapi_state *st = talloc_get_type(req->async.private,
+                                               struct dreplsrv_out_drsuapi_state);
+       struct composite_context *c = st->creq;
+
+       c->status = dcerpc_ndr_request_recv(req);
+       if (!composite_is_ok(c)) return;
+
+       if (!W_ERROR_IS_OK(st->bind_r.out.result)) {
+               composite_error(c, werror_to_ntstatus(st->bind_r.out.result));
+               return;
+       }
+
+       ZERO_STRUCT(st->drsuapi->remote_info28);
+       if (st->bind_r.out.bind_info) {
+               switch (st->bind_r.out.bind_info->length) {
+               case 24: {
+                       struct drsuapi_DsBindInfo24 *info24;
+                       info24 = &st->bind_r.out.bind_info->info.info24;
+                       st->drsuapi->remote_info28.supported_extensions = info24->supported_extensions;
+                       st->drsuapi->remote_info28.site_guid            = info24->site_guid;
+                       st->drsuapi->remote_info28.u1                   = info24->u1;
+                       st->drsuapi->remote_info28.repl_epoch           = 0;
+                       break;
+               }
+               case 28:
+                       st->drsuapi->remote_info28 = st->bind_r.out.bind_info->info.info28;
+                       break;
+               }
+       }
+
+       composite_done(c);
+}
+
+static NTSTATUS dreplsrv_out_drsuapi_recv(struct composite_context *c)
+{
+       NTSTATUS status;
+       struct dreplsrv_out_drsuapi_state *st = talloc_get_type(c->private_data,
+                                               struct dreplsrv_out_drsuapi_state);
+
+       status = composite_wait(c);
+
+       if (NT_STATUS_IS_OK(status)) {
+               st->conn->drsuapi = talloc_steal(st->conn, st->drsuapi);
+       }
+
+       talloc_free(c);
+       return status;
+}
+
+struct dreplsrv_op_pull_source_state {
+       struct composite_context *creq;
+
+       struct dreplsrv_out_operation *op;
+
+       struct dreplsrv_drsuapi_connection *drsuapi;
+
+       bool have_all;
+
+       uint32_t ctr_level;
+       struct drsuapi_DsGetNCChangesCtr1 *ctr1;
+       struct drsuapi_DsGetNCChangesCtr6 *ctr6;
+};
+
+static void dreplsrv_op_pull_source_connect_recv(struct composite_context *creq);
+
+struct composite_context *dreplsrv_op_pull_source_send(struct dreplsrv_out_operation *op)
+{
+       struct composite_context *c;
+       struct composite_context *creq;
+       struct dreplsrv_op_pull_source_state *st;
+
+       c = composite_create(op, op->service->task->event_ctx);
+       if (c == NULL) return NULL;
+
+       st = talloc_zero(c, struct dreplsrv_op_pull_source_state);
+       if (composite_nomem(st, c)) return c;
+
+       st->creq        = c;
+       st->op          = op;
+
+       creq = dreplsrv_out_drsuapi_send(op->source_dsa->conn);
+       composite_continue(c, creq, dreplsrv_op_pull_source_connect_recv, st);
+
+       return c;
+}
+
+static void dreplsrv_op_pull_source_get_changes_send(struct dreplsrv_op_pull_source_state *st);
+
+static void dreplsrv_op_pull_source_connect_recv(struct composite_context *creq)
+{
+       struct dreplsrv_op_pull_source_state *st = talloc_get_type(creq->async.private_data,
+                                                  struct dreplsrv_op_pull_source_state);
+       struct composite_context *c = st->creq;
+
+       c->status = dreplsrv_out_drsuapi_recv(creq);
+       if (!composite_is_ok(c)) return;
+
+       dreplsrv_op_pull_source_get_changes_send(st);
+}
+
+static void dreplsrv_op_pull_source_get_changes_recv(struct rpc_request *req);
+
+static void dreplsrv_op_pull_source_get_changes_send(struct dreplsrv_op_pull_source_state *st)
+{
+       struct composite_context *c = st->creq;
+       struct repsFromTo1 *rf1 = st->op->source_dsa->repsFrom1;
+       struct dreplsrv_service *service = st->op->service;
+       struct dreplsrv_partition *partition = st->op->source_dsa->partition;
+       struct dreplsrv_drsuapi_connection *drsuapi = st->op->source_dsa->conn->drsuapi;
+       struct rpc_request *req;
+       struct drsuapi_DsGetNCChanges *r;
+
+       r = talloc(st, struct drsuapi_DsGetNCChanges);
+       if (composite_nomem(r, c)) return;
+
+       r->in.level = talloc(r, int32_t);
+       if (composite_nomem(r->in.level, c)) return;
+       r->out.level = talloc(r, int32_t);
+       if (composite_nomem(r->out.level, c)) return;
+
+       r->in.bind_handle       = &drsuapi->bind_handle;
+       if (drsuapi->remote_info28.supported_extensions & DRSUAPI_SUPPORTED_EXTENSION_GETCHGREQ_V8) {
+               *r->in.level                            = 8;
+               r->in.req.req8.destination_dsa_guid     = service->ntds_guid;
+               r->in.req.req8.source_dsa_invocation_id = rf1->source_dsa_invocation_id;
+               r->in.req.req8.naming_context           = &partition->nc;
+               r->in.req.req8.highwatermark            = rf1->highwatermark;
+               r->in.req.req8.uptodateness_vector      = NULL;/*&partition->uptodatevector_ex;*/
+               r->in.req.req8.replica_flags            = rf1->replica_flags;
+               r->in.req.req8.max_object_count         = 133;
+               r->in.req.req8.max_ndr_size             = 1336811;
+               r->in.req.req8.unknown4                 = 0;
+               r->in.req.req8.h1                       = 0;
+               r->in.req.req8.unique_ptr1              = 0;
+               r->in.req.req8.unique_ptr2              = 0;
+               r->in.req.req8.mapping_ctr.num_mappings = 0;
+               r->in.req.req8.mapping_ctr.mappings     = NULL;
+       } else {
+               *r->in.level                            = 5;
+               r->in.req.req5.destination_dsa_guid     = service->ntds_guid;
+               r->in.req.req5.source_dsa_invocation_id = rf1->source_dsa_invocation_id;
+               r->in.req.req5.naming_context           = &partition->nc;
+               r->in.req.req5.highwatermark            = rf1->highwatermark;
+               r->in.req.req5.uptodateness_vector      = NULL;/*&partition->uptodatevector_ex;*/
+               r->in.req.req5.replica_flags            = rf1->replica_flags;
+               r->in.req.req5.max_object_count         = 133;
+               r->in.req.req5.max_ndr_size             = 1336770;
+               r->in.req.req5.unknown4                 = 0;
+               r->in.req.req5.h1                       = 0;
+       }
+
+       req = dcerpc_drsuapi_DsGetNCChanges_send(drsuapi->pipe, r, r);
+       composite_continue_rpc(c, req, dreplsrv_op_pull_source_get_changes_recv, st);
+}
+
+static void dreplsrv_op_pull_source_apply_changes_send(struct dreplsrv_op_pull_source_state *st,
+                                                      struct drsuapi_DsGetNCChanges *r,
+                                                      uint32_t ctr_level,
+                                                      struct drsuapi_DsGetNCChangesCtr1 *ctr1,
+                                                      struct drsuapi_DsGetNCChangesCtr6 *ctr6);
+
+static void dreplsrv_op_pull_source_get_changes_recv(struct rpc_request *req)
+{
+       struct dreplsrv_op_pull_source_state *st = talloc_get_type(req->async.private,
+                                                  struct dreplsrv_op_pull_source_state);
+       struct composite_context *c = st->creq;
+       struct drsuapi_DsGetNCChanges *r = talloc_get_type(req->ndr.struct_ptr,
+                                          struct drsuapi_DsGetNCChanges);
+       uint32_t ctr_level = 0;
+       struct drsuapi_DsGetNCChangesCtr1 *ctr1 = NULL;
+       struct drsuapi_DsGetNCChangesCtr6 *ctr6 = NULL;
+
+       c->status = dcerpc_ndr_request_recv(req);
+       if (!composite_is_ok(c)) return;
+
+       if (!W_ERROR_IS_OK(r->out.result)) {
+               composite_error(c, werror_to_ntstatus(r->out.result));
+               return;
+       }
+
+       if (*r->out.level == 1) {
+               ctr_level = 1;
+               ctr1 = &r->out.ctr.ctr1;
+       } else if (*r->out.level == 2) {
+               ctr_level = 1;
+               ctr1 = r->out.ctr.ctr2.ctr.mszip1.ctr1;
+       } else if (*r->out.level == 6) {
+               ctr_level = 6;
+               ctr6 = &r->out.ctr.ctr6;
+       } else if (*r->out.level == 7 &&
+                  r->out.ctr.ctr7.level == 6 &&
+                  r->out.ctr.ctr7.type == DRSUAPI_COMPRESSION_TYPE_MSZIP) {
+               ctr_level = 6;
+               ctr6 = r->out.ctr.ctr7.ctr.mszip6.ctr6;
+       } else {
+               composite_error(c, werror_to_ntstatus(WERR_BAD_NET_RESP));
+               return;
+       }
+
+       dreplsrv_op_pull_source_apply_changes_send(st, r, ctr_level, ctr1, ctr6);
+}
+
+static void dreplsrv_op_pull_source_apply_changes_send(struct dreplsrv_op_pull_source_state *st,
+                                                      struct drsuapi_DsGetNCChanges *r,
+                                                      uint32_t ctr_level,
+                                                      struct drsuapi_DsGetNCChangesCtr1 *ctr1,
+                                                      struct drsuapi_DsGetNCChangesCtr6 *ctr6)
+{
+       struct composite_context *c = st->creq;
+       struct repsFromTo1 rf1 = *st->op->source_dsa->repsFrom1;
+       struct dreplsrv_service *service = st->op->service;
+       struct dreplsrv_partition *partition = st->op->source_dsa->partition;
+       struct dreplsrv_drsuapi_connection *drsuapi = st->op->source_dsa->conn->drsuapi;
+       const struct drsuapi_DsReplicaOIDMapping_Ctr *mapping_ctr;
+       uint32_t object_count;
+       struct drsuapi_DsReplicaObjectListItemEx *first_object;
+       uint32_t linked_attributes_count;
+       struct drsuapi_DsReplicaLinkedAttribute *linked_attributes;
+       const struct drsuapi_DsReplicaCursor2CtrEx *uptodateness_vector;
+       bool more_data = false;
+       WERROR status;
+
+       switch (ctr_level) {
+       case 1:
+               mapping_ctr                     = &ctr1->mapping_ctr;
+               object_count                    = ctr1->object_count;
+               first_object                    = ctr1->first_object;
+               linked_attributes_count         = 0;
+               linked_attributes               = NULL;
+               rf1.highwatermark               = ctr1->new_highwatermark;
+               uptodateness_vector             = NULL; /* TODO: map it */
+               break;
+       case 6:
+               mapping_ctr                     = &ctr6->mapping_ctr;
+               object_count                    = ctr6->object_count;
+               first_object                    = ctr6->first_object;
+               linked_attributes_count         = ctr6->linked_attributes_count;
+               linked_attributes               = ctr6->linked_attributes;
+               rf1.highwatermark               = ctr6->new_highwatermark;
+               uptodateness_vector             = ctr6->uptodateness_vector;
+               break;
+       default:
+               composite_error(c, werror_to_ntstatus(WERR_BAD_NET_RESP));
+               return;
+       }
+
+       status = dsdb_extended_replicated_objects_commit(service->samdb,
+                                                        partition->nc.dn,
+                                                        mapping_ctr,
+                                                        object_count,
+                                                        first_object,
+                                                        linked_attributes_count,
+                                                        linked_attributes,
+                                                        &rf1,
+                                                        uptodateness_vector,
+                                                        &drsuapi->gensec_skey,
+                                                        st, NULL);
+       if (!W_ERROR_IS_OK(status)) {
+               DEBUG(0,("Failed to commit objects: %s\n", win_errstr(status)));
+               composite_error(c, werror_to_ntstatus(status));
+               return;
+       }
+
+       /* if it applied fine, we need to update the highwatermark */
+       *st->op->source_dsa->repsFrom1 = rf1;
+
+       /*
+        * TODO: update our uptodatevector!
+        */
+
+       /*
+        * if the tmp_highest_usn is higher than highest_usn
+        * there's more to pull from this source_dsa
+        */
+       if (rf1.highwatermark.tmp_highest_usn > rf1.highwatermark.highest_usn) {
+               more_data = true;
+       }
+
+       if (more_data) {
+               dreplsrv_op_pull_source_get_changes_send(st);
+               return;
+       }
+
+       composite_done(c);
+}
+
+WERROR dreplsrv_op_pull_source_recv(struct composite_context *c)
+{
+       NTSTATUS status;
+
+       status = composite_wait(c);
+
+       talloc_free(c);
+       return ntstatus_to_werror(status);
+}
diff --git a/source4/dsdb/repl/drepl_out_helpers.h b/source4/dsdb/repl/drepl_out_helpers.h
new file mode 100644 (file)
index 0000000..0063aac
--- /dev/null
@@ -0,0 +1,27 @@
+/* 
+   Unix SMB/CIFS mplementation.
+   DSDB replication service helper function for outgoing traffic
+   
+   Copyright (C) Stefan Metzmacher 2007
+    
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 2 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+   
+*/
+
+#ifndef DREPL_OUT_HELPERS_H
+#define DREPL_OUT_HELPERS_H
+
+
+#endif /* DREPL_OUT_HELPERS_H */
diff --git a/source4/dsdb/repl/drepl_out_pull.c b/source4/dsdb/repl/drepl_out_pull.c
new file mode 100644 (file)
index 0000000..2ac8ac9
--- /dev/null
@@ -0,0 +1,155 @@
+/* 
+   Unix SMB/CIFS mplementation.
+   DSDB replication service outgoing Pull-Replication
+   
+   Copyright (C) Stefan Metzmacher 2007
+    
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 2 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+   
+*/
+
+#include "includes.h"
+#include "dsdb/samdb/samdb.h"
+#include "auth/auth.h"
+#include "smbd/service.h"
+#include "lib/events/events.h"
+#include "lib/messaging/irpc.h"
+#include "dsdb/repl/drepl_service.h"
+#include "lib/ldb/include/ldb_errors.h"
+#include "lib/util/dlinklist.h"
+#include "librpc/gen_ndr/ndr_misc.h"
+#include "librpc/gen_ndr/ndr_drsuapi.h"
+#include "librpc/gen_ndr/ndr_drsblobs.h"
+#include "libcli/composite/composite.h"
+
+static WERROR dreplsrv_schedule_partition_pull_source(struct dreplsrv_service *s,
+                                                     struct dreplsrv_partition *p,
+                                                     struct dreplsrv_partition_source_dsa *source,
+                                                     TALLOC_CTX *mem_ctx)
+{
+       struct dreplsrv_out_operation *op;
+
+       op = talloc_zero(mem_ctx, struct dreplsrv_out_operation);
+       W_ERROR_HAVE_NO_MEMORY(op);
+
+       op->service     = s;
+       op->source_dsa  = source;
+
+       DLIST_ADD_END(s->ops.pending, op, struct dreplsrv_out_operation *);
+       talloc_steal(s, op);
+       return WERR_OK;
+}
+
+static WERROR dreplsrv_schedule_partition_pull(struct dreplsrv_service *s,
+                                              struct dreplsrv_partition *p,
+                                              TALLOC_CTX *mem_ctx)
+{
+       WERROR status;
+       struct dreplsrv_partition_source_dsa *cur;
+
+       for (cur = p->sources; cur; cur = cur->next) {
+               status = dreplsrv_schedule_partition_pull_source(s, p, cur, mem_ctx);
+               W_ERROR_NOT_OK_RETURN(status);
+       }
+
+       return WERR_OK;
+}
+
+WERROR dreplsrv_schedule_pull_replication(struct dreplsrv_service *s, TALLOC_CTX *mem_ctx)
+{
+       WERROR status;
+       struct dreplsrv_partition *p;
+
+       for (p = s->partitions; p; p = p->next) {
+               status = dreplsrv_schedule_partition_pull(s, p, mem_ctx);
+               W_ERROR_NOT_OK_RETURN(status);
+       }
+
+       return WERR_OK;
+}
+
+static void dreplsrv_pending_op_callback(struct dreplsrv_out_operation *op)
+{
+       struct repsFromTo1 *rf = op->source_dsa->repsFrom1;
+       struct dreplsrv_service *s = op->service;
+       time_t t;
+       NTTIME now;
+
+       t = time(NULL);
+       unix_to_nt_time(&now, t);
+
+       rf->result_last_attempt = dreplsrv_op_pull_source_recv(op->creq);
+       if (W_ERROR_IS_OK(rf->result_last_attempt)) {
+               rf->consecutive_sync_failures   = 0;
+               rf->last_success                = now;
+               DEBUG(2,("dreplsrv_op_pull_source(%s)\n",
+                       win_errstr(rf->result_last_attempt)));
+               goto done;
+       }
+
+       rf->consecutive_sync_failures++;
+
+       DEBUG(1,("dreplsrv_op_pull_source(%s/%s) failures[%u]\n",
+               win_errstr(rf->result_last_attempt),
+               nt_errstr(werror_to_ntstatus(rf->result_last_attempt)),
+               rf->consecutive_sync_failures));
+
+done:
+       talloc_free(op);
+       s->ops.current = NULL;
+       dreplsrv_run_pending_ops(s);
+}
+
+static void dreplsrv_pending_op_callback_creq(struct composite_context *creq)
+{
+       struct dreplsrv_out_operation *op = talloc_get_type(creq->async.private_data,
+                                                          struct dreplsrv_out_operation);
+       dreplsrv_pending_op_callback(op);
+}
+
+void dreplsrv_run_pending_ops(struct dreplsrv_service *s)
+{
+       struct dreplsrv_out_operation *op;
+       time_t t;
+       NTTIME now;
+
+       if (s->ops.current) {
+               /* if there's still one running, we're done */
+               return;
+       }
+
+       if (!s->ops.pending) {
+               /* if there're no pending operations, we're done */
+               return;
+       }
+
+       t = time(NULL);
+       unix_to_nt_time(&now, t);
+
+       op = s->ops.pending;
+       s->ops.current = op;
+       DLIST_REMOVE(s->ops.pending, op);
+
+       op->source_dsa->repsFrom1->last_attempt = now;
+
+       op->creq = dreplsrv_op_pull_source_send(op);
+       if (!op->creq) {
+               dreplsrv_pending_op_callback(op);
+               return;
+       }
+
+       op->creq->async.fn              = dreplsrv_pending_op_callback_creq;
+       op->creq->async.private_data    = op;
+}
diff --git a/source4/dsdb/repl/drepl_partitions.c b/source4/dsdb/repl/drepl_partitions.c
new file mode 100644 (file)
index 0000000..56aff06
--- /dev/null
@@ -0,0 +1,268 @@
+/* 
+   Unix SMB/CIFS mplementation.
+   DSDB replication service
+   
+   Copyright (C) Stefan Metzmacher 2007
+    
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 2 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+   
+*/
+
+#include "includes.h"
+#include "dsdb/samdb/samdb.h"
+#include "auth/auth.h"
+#include "smbd/service.h"
+#include "lib/events/events.h"
+#include "lib/messaging/irpc.h"
+#include "dsdb/repl/drepl_service.h"
+#include "lib/ldb/include/ldb_errors.h"
+#include "lib/util/dlinklist.h"
+#include "librpc/gen_ndr/ndr_misc.h"
+#include "librpc/gen_ndr/ndr_drsuapi.h"
+#include "librpc/gen_ndr/ndr_drsblobs.h"
+
+static WERROR dreplsrv_refresh_partitions(struct dreplsrv_service *s);
+
+WERROR dreplsrv_load_partitions(struct dreplsrv_service *s)
+{
+       WERROR status;
+       struct ldb_dn *basedn;
+       struct ldb_result *r;
+       struct ldb_message_element *el;
+       static const char *attrs[] = { "namingContexts", NULL };
+       uint32_t i;
+       int ret;
+
+       basedn = ldb_dn_new(s, s->samdb, NULL);
+       W_ERROR_HAVE_NO_MEMORY(basedn);
+
+       ret = ldb_search(s->samdb, basedn, LDB_SCOPE_BASE, 
+                        "(objectClass=*)", attrs, &r);
+       talloc_free(basedn);
+       if (ret != LDB_SUCCESS) {
+               return WERR_FOOBAR;
+       } else if (r->count != 1) {
+               talloc_free(r);
+               return WERR_FOOBAR;
+       }
+       talloc_steal(s, r);
+
+       el = ldb_msg_find_element(r->msgs[0], "namingContexts");
+       if (!el) {
+               return WERR_FOOBAR;
+       }
+
+       for (i=0; el && i < el->num_values; i++) {
+               const char *v = (const char *)el->values[i].data;
+               struct ldb_dn *pdn;
+               struct dreplsrv_partition *p;
+
+               pdn = ldb_dn_new(s, s->samdb, v);
+               if (!ldb_dn_validate(pdn)) {
+                       return WERR_FOOBAR;
+               }
+
+               p = talloc_zero(s, struct dreplsrv_partition);
+               W_ERROR_HAVE_NO_MEMORY(p);
+
+               p->dn = talloc_steal(p, pdn);
+
+               DLIST_ADD(s->partitions, p);
+
+               DEBUG(2, ("dreplsrv_partition[%s] loaded\n", v));
+       }
+
+       talloc_free(r);
+
+       status = dreplsrv_refresh_partitions(s);
+       W_ERROR_NOT_OK_RETURN(status);
+
+       return WERR_OK;
+}
+
+static WERROR dreplsrv_out_connection_attach(struct dreplsrv_service *s,
+                                            const struct repsFromTo1 *rft,
+                                            struct dreplsrv_out_connection **_conn)
+{
+       struct dreplsrv_out_connection *cur, *conn = NULL;
+       const char *hostname;
+
+       if (!rft->other_info) {
+               return WERR_FOOBAR;
+       }
+
+       if (!rft->other_info->dns_name) {
+               return WERR_FOOBAR;
+       }
+
+       hostname = rft->other_info->dns_name;
+
+       for (cur = s->connections; cur; cur = cur->next) {              
+               if (strcmp(cur->binding->host, hostname) == 0) {
+                       conn = cur;
+                       break;
+               }
+       }
+
+       if (!conn) {
+               NTSTATUS nt_status;
+               char *binding_str;
+
+               conn = talloc_zero(s, struct dreplsrv_out_connection);
+               W_ERROR_HAVE_NO_MEMORY(conn);
+
+               conn->service   = s;
+
+               binding_str = talloc_asprintf(conn, "ncacn_ip_tcp:%s[krb5,seal]",
+                                             hostname);
+               W_ERROR_HAVE_NO_MEMORY(binding_str);
+               nt_status = dcerpc_parse_binding(conn, binding_str, &conn->binding);
+               talloc_free(binding_str);
+               if (!NT_STATUS_IS_OK(nt_status)) {
+                       return ntstatus_to_werror(nt_status);
+               }
+
+               DLIST_ADD_END(s->connections, conn, struct dreplsrv_out_connection *);
+
+               DEBUG(2,("dreplsrv_out_connection_attach(%s): create\n", conn->binding->host));
+       } else {
+               DEBUG(2,("dreplsrv_out_connection_attach(%s): attach\n", conn->binding->host));
+       }
+
+       *_conn = conn;
+       return WERR_OK;
+}
+
+static WERROR dreplsrv_partition_add_source_dsa(struct dreplsrv_service *s,
+                                               struct dreplsrv_partition *p,
+                                               const struct ldb_val *val)
+{
+       WERROR status;
+       NTSTATUS nt_status;
+       struct dreplsrv_partition_source_dsa *source;
+
+       source = talloc_zero(p, struct dreplsrv_partition_source_dsa);
+       W_ERROR_HAVE_NO_MEMORY(source);
+
+       nt_status = ndr_pull_struct_blob(val, source, &source->_repsFromBlob,
+                                        (ndr_pull_flags_fn_t)ndr_pull_repsFromToBlob);
+       if (!NT_STATUS_IS_OK(nt_status)) {
+               return ntstatus_to_werror(nt_status);
+       }
+       /* NDR_PRINT_DEBUG(repsFromToBlob, &source->_repsFromBlob); */
+       if (source->_repsFromBlob.version != 1) {
+               return WERR_DS_DRA_INTERNAL_ERROR;
+       }
+
+       source->partition       = p;
+       source->repsFrom1       = &source->_repsFromBlob.ctr.ctr1;
+
+       status = dreplsrv_out_connection_attach(s, source->repsFrom1, &source->conn);
+       W_ERROR_NOT_OK_RETURN(status);
+
+       DLIST_ADD_END(p->sources, source, struct dreplsrv_partition_source_dsa *);
+       return WERR_OK;
+}
+
+static WERROR dreplsrv_refresh_partition(struct dreplsrv_service *s,
+                                        struct dreplsrv_partition *p,
+                                        TALLOC_CTX *mem_ctx)
+{
+       WERROR status;
+       NTSTATUS nt_status;
+       const struct ldb_val *ouv_value;
+       struct replUpToDateVectorBlob ouv;
+       struct dom_sid *nc_sid;
+       struct ldb_message_element *orf_el = NULL;
+       struct ldb_result *r;
+       uint32_t i;
+       int ret;
+       static const char *attrs[] = {
+               "objectSid",
+               "objectGUID",
+               "replUpToDateVector",
+               "repsFrom",
+               NULL
+       };
+
+       DEBUG(2, ("dreplsrv_refresh_partition(%s)\n",
+               ldb_dn_get_linearized(p->dn)));
+
+       ret = ldb_search(s->samdb, p->dn, LDB_SCOPE_BASE,
+                        "(objectClass=*)", attrs, &r);
+       if (ret != LDB_SUCCESS) {
+               return WERR_FOOBAR;
+       } else if (r->count != 1) {
+               talloc_free(r);
+               return WERR_FOOBAR;
+       }
+       talloc_steal(mem_ctx, r);
+
+       ZERO_STRUCT(p->nc);
+       p->nc.dn        = ldb_dn_alloc_linearized(p, p->dn);
+       W_ERROR_HAVE_NO_MEMORY(p->nc.dn);
+       p->nc.guid      = samdb_result_guid(r->msgs[0], "objectGUID");
+       nc_sid          = samdb_result_dom_sid(p, r->msgs[0], "objectSid");
+       if (nc_sid) {
+               p->nc.sid       = *nc_sid;
+       }
+
+       ouv_value = ldb_msg_find_ldb_val(r->msgs[0], "replUpToDateVector");
+       if (ouv_value) {
+               nt_status = ndr_pull_struct_blob(ouv_value, mem_ctx, &ouv,
+                                                (ndr_pull_flags_fn_t)ndr_pull_replUpToDateVectorBlob);
+               if (!NT_STATUS_IS_OK(nt_status)) {
+                       return ntstatus_to_werror(nt_status);
+               }
+               /* NDR_PRINT_DEBUG(replUpToDateVectorBlob, &ouv); */
+               if (ouv.version != 2) {
+                       return WERR_DS_DRA_INTERNAL_ERROR;
+               }
+
+               p->uptodatevector.count         = ouv.ctr.ctr2.count;
+               p->uptodatevector.reserved      = ouv.ctr.ctr2.reserved;
+               p->uptodatevector.cursors       = talloc_steal(p, ouv.ctr.ctr2.cursors);
+       }
+
+       /*
+        * TODO: add our own uptodatevector cursor
+        */
+
+
+       orf_el = ldb_msg_find_element(r->msgs[0], "repsFrom");
+       if (orf_el) {
+               for (i=0; i < orf_el->num_values; i++) {
+                       status = dreplsrv_partition_add_source_dsa(s, p, &orf_el->values[i]);
+                       W_ERROR_NOT_OK_RETURN(status);  
+               }
+       }
+
+       talloc_free(r);
+
+       return WERR_OK;
+}
+
+static WERROR dreplsrv_refresh_partitions(struct dreplsrv_service *s)
+{
+       WERROR status;
+       struct dreplsrv_partition *p;
+
+       for (p = s->partitions; p; p = p->next) {
+               status = dreplsrv_refresh_partition(s, p, p);
+               W_ERROR_NOT_OK_RETURN(status);
+       }
+
+       return WERR_OK;
+}
diff --git a/source4/dsdb/repl/drepl_periodic.c b/source4/dsdb/repl/drepl_periodic.c
new file mode 100644 (file)
index 0000000..38e9c71
--- /dev/null
@@ -0,0 +1,110 @@
+/* 
+   Unix SMB/CIFS mplementation.
+   DSDB replication service periodic handling
+   
+   Copyright (C) Stefan Metzmacher 2007
+    
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 2 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+   
+*/
+
+#include "includes.h"
+#include "dsdb/samdb/samdb.h"
+#include "auth/auth.h"
+#include "smbd/service.h"
+#include "lib/events/events.h"
+#include "lib/messaging/irpc.h"
+#include "dsdb/repl/drepl_service.h"
+#include "lib/ldb/include/ldb_errors.h"
+#include "lib/util/dlinklist.h"
+#include "librpc/gen_ndr/ndr_misc.h"
+#include "librpc/gen_ndr/ndr_drsuapi.h"
+#include "librpc/gen_ndr/ndr_drsblobs.h"
+
+static void dreplsrv_periodic_run(struct dreplsrv_service *service);
+
+static void dreplsrv_periodic_handler_te(struct event_context *ev, struct timed_event *te,
+                                        struct timeval t, void *ptr)
+{
+       struct dreplsrv_service *service = talloc_get_type(ptr, struct dreplsrv_service);
+       WERROR status;
+
+       service->periodic.te = NULL;
+
+       dreplsrv_periodic_run(service);
+
+       status = dreplsrv_periodic_schedule(service, service->periodic.interval);
+       if (!W_ERROR_IS_OK(status)) {
+               task_server_terminate(service->task, win_errstr(status));
+               return;
+       }
+}
+
+WERROR dreplsrv_periodic_schedule(struct dreplsrv_service *service, uint32_t next_interval)
+{
+       TALLOC_CTX *tmp_mem;
+       struct timed_event *new_te;
+       struct timeval next_time;
+
+       /* prevent looping */
+       if (next_interval == 0) next_interval = 1;
+
+       next_time = timeval_current_ofs(next_interval, 50);
+
+       if (service->periodic.te) {
+               /*
+                * if the timestamp of the new event is higher,
+                * as current next we don't need to reschedule
+                */
+               if (timeval_compare(&next_time, &service->periodic.next_event) > 0) {
+                       return WERR_OK;
+               }
+       }
+
+       /* reset the next scheduled timestamp */
+       service->periodic.next_event = next_time;
+
+       new_te = event_add_timed(service->task->event_ctx, service,
+                                service->periodic.next_event,
+                                dreplsrv_periodic_handler_te, service);
+       W_ERROR_HAVE_NO_MEMORY(new_te);
+
+       tmp_mem = talloc_new(service);
+       DEBUG(2,("dreplsrv_periodic_schedule(%u) %sscheduled for: %s\n",
+               next_interval,
+               (service->periodic.te?"re":""),
+               nt_time_string(tmp_mem, timeval_to_nttime(&next_time))));
+       talloc_free(tmp_mem);
+
+       talloc_free(service->periodic.te);
+       service->periodic.te = new_te;
+
+       return WERR_OK;
+}
+
+static void dreplsrv_periodic_run(struct dreplsrv_service *service)
+{
+       TALLOC_CTX *mem_ctx;
+
+       DEBUG(2,("dreplsrv_periodic_run(): schedule pull replication\n"));
+
+       mem_ctx = talloc_new(service);
+       dreplsrv_schedule_pull_replication(service, mem_ctx);
+       talloc_free(mem_ctx);
+
+       DEBUG(2,("dreplsrv_periodic_run(): run pending_ops\n"));
+
+       dreplsrv_run_pending_ops(service);
+}
index 9a3cac424728f5b717deabeb7e8546f57b45a8f9..17690d135d0aa7af25935ba20879057a995c9188 100644 (file)
@@ -47,70 +47,68 @@ static WERROR dreplsrv_init_creds(struct dreplsrv_service *service)
 
 static WERROR dreplsrv_connect_samdb(struct dreplsrv_service *service)
 {
+       const struct GUID *ntds_guid;
+       struct drsuapi_DsBindInfo28 *bind_info28;
+
        service->samdb = samdb_connect(service, service->system_session_info);
        if (!service->samdb) {
                return WERR_DS_SERVICE_UNAVAILABLE;
        }
 
-       return WERR_OK;
-}
-
-static void dreplsrv_periodic_handler_te(struct event_context *ev, struct timed_event *te,
-                                        struct timeval t, void *ptr)
-{
-       struct dreplsrv_service *service = talloc_get_type(ptr, struct dreplsrv_service);
-       WERROR status;
-
-       service->periodic.te = NULL;
-
-       status = dreplsrv_periodic_schedule(service, service->periodic.interval);
-       if (!W_ERROR_IS_OK(status)) {
-               task_server_terminate(service->task, win_errstr(status));
-               return;
+       ntds_guid = samdb_ntds_objectGUID(service->samdb);
+       if (!ntds_guid) {
+               return WERR_DS_SERVICE_UNAVAILABLE;
        }
-}
 
-WERROR dreplsrv_periodic_schedule(struct dreplsrv_service *service, uint32_t next_interval)
-{
-       TALLOC_CTX *tmp_mem;
-       struct timed_event *new_te;
-       struct timeval next_time;
-
-       /* prevent looping */
-       if (next_interval == 0) next_interval = 1;
-
-       next_time = timeval_current_ofs(next_interval, 5000);
-
-       if (service->periodic.te) {
-               /*
-                * if the timestamp of the new event is higher,
-                * as current next we don't need to reschedule
-                */
-               if (timeval_compare(&next_time, &service->periodic.next_event) > 0) {
-                       return WERR_OK;
-               }
+       service->ntds_guid = *ntds_guid;
+
+       bind_info28                             = &service->bind_info28;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_BASE;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_ASYNC_REPLICATION;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_REMOVEAPI;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_MOVEREQ_V2;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_GETCHG_COMPRESS;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_DCINFO_V1;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_RESTORE_USN_OPTIMIZATION;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_KCC_EXECUTE;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_ADDENTRY_V2;
+#if 0
+       if (s->domain_behavior_version == 2) {
+               /* TODO: find out how this is really triggered! */
+               bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_LINKED_VALUE_REPLICATION;
        }
-
-       /* reset the next scheduled timestamp */
-       service->periodic.next_event = next_time;
-
-       new_te = event_add_timed(service->task->event_ctx, service,
-                                service->periodic.next_event,
-                                dreplsrv_periodic_handler_te, service);
-       W_ERROR_HAVE_NO_MEMORY(new_te);
-
-       tmp_mem = talloc_new(service);
-       DEBUG(6,("dreplsrv_periodic_schedule(%u) %sscheduled for: %s\n",
-               next_interval,
-               (service->periodic.te?"re":""),
-               nt_time_string(tmp_mem, timeval_to_nttime(&next_time))));
-       talloc_free(tmp_mem);
-
-       talloc_free(service->periodic.te);
-       service->periodic.te = new_te;
+#endif
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_DCINFO_V2;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_INSTANCE_TYPE_NOT_REQ_ON_MOD;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_CRYPTO_BIND;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_GET_REPL_INFO;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_STRONG_ENCRYPTION;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_DCINFO_V01;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_TRANSITIVE_MEMBERSHIP;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_ADD_SID_HISTORY;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_POST_BETA3;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_00100000;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_GET_MEMBERSHIPS2;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_GETCHGREQ_V6;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_NONDOMAIN_NCS;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_GETCHGREQ_V8;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_GETCHGREPLY_V5;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_GETCHGREPLY_V6;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_ADDENTRYREPLY_V3;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_GETCHGREPLY_V7;
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_VERIFY_OBJECT;
+#if 0 /* we don't support XPRESS compression yet */
+       bind_info28->supported_extensions       |= DRSUAPI_SUPPORTED_EXTENSION_XPRESS_COMPRESS;
+#endif
+       /* TODO: fill in site_guid */
+       bind_info28->site_guid                  = GUID_zero();
+       /* TODO: find out how this is really triggered! */
+       bind_info28->u1                         = 0;
+       bind_info28->repl_epoch                 = 0;
 
        return WERR_OK;
 }
+
 /*
   startup the dsdb replicator service task
 */
@@ -158,7 +156,15 @@ static void dreplsrv_task_init(struct task_server *task)
                return;
        }
 
-       service->periodic.interval      = 300; /* in seconds */
+       status = dreplsrv_load_partitions(service);
+       if (!W_ERROR_IS_OK(status)) {
+               task_server_terminate(task, talloc_asprintf(task,
+                                     "dreplsrv: Failed to load partitions: %s\n",
+                                     win_errstr(status)));
+               return;
+       }
+
+       service->periodic.interval      = lp_parm_int(-1, "dreplsrv", "periodic_interval", 300); /* in seconds */
 
        status = dreplsrv_periodic_schedule(service, service->periodic.interval);
        if (!W_ERROR_IS_OK(status)) {
index fb4fe2e50041e6a12d54be4b3fee80a11cbcee14..bcbbd3038f4dfbd48d5d6017f654090483bcd238 100644 (file)
 #ifndef _DSDB_REPL_DREPL_SERVICE_H_
 #define _DSDB_REPL_DREPL_SERVICE_H_
 
+#include "librpc/gen_ndr/ndr_drsuapi_c.h"
+
+struct dreplsrv_service;
+struct dreplsrv_partition;
+
+struct dreplsrv_drsuapi_connection {
+       /*
+        * this pipe pointer is also the indicator
+        * for a valid connection
+        */
+       struct dcerpc_pipe *pipe;
+
+       DATA_BLOB gensec_skey;
+       struct drsuapi_DsBindInfo28 remote_info28;
+       struct policy_handle bind_handle;
+};
+
+struct dreplsrv_out_connection {
+       struct dreplsrv_out_connection *prev, *next;
+
+       struct dreplsrv_service *service;
+
+       /*
+        * the binding for the outgoing connection
+        */
+       struct dcerpc_binding *binding;
+
+       /* the out going connection to the source dsa */
+       struct dreplsrv_drsuapi_connection *drsuapi;
+};
+
+struct dreplsrv_partition_source_dsa {
+       struct dreplsrv_partition_source_dsa *prev, *next;
+
+       struct dreplsrv_partition *partition;
+
+       /*
+        * the cached repsFrom value for this source dsa
+        *
+        * it needs to be updated after each DsGetNCChanges() call
+        * to the source dsa
+        *
+        * repsFrom1 == &_repsFromBlob.ctr.ctr1
+        */
+       struct repsFromToBlob _repsFromBlob;
+       struct repsFromTo1 *repsFrom1;
+
+       /* the reference to the source_dsa and its outgoing connection */
+       struct dreplsrv_out_connection *conn;
+};
+
+struct dreplsrv_partition {
+       struct dreplsrv_partition *prev, *next;
+
+       struct dreplsrv_service *service;
+
+       /* the dn of the partition */
+       struct ldb_dn *dn;
+       struct drsuapi_DsReplicaObjectIdentifier nc;
+
+       /* 
+        * uptodate vector needs to be updated before and after each DsGetNCChanges() call
+        *
+        * - before: we need to use our own invocationId together with our highestCommitedUsn
+        * - after: we need to merge in the remote uptodatevector, to avoid reading it again
+        */
+       struct replUpToDateVectorCtr2 uptodatevector;
+       struct drsuapi_DsReplicaCursorCtrEx uptodatevector_ex;
+
+       /*
+        * a linked list of all source dsa's we replicate from
+        */
+       struct dreplsrv_partition_source_dsa *sources;
+};
+
+struct dreplsrv_out_operation {
+       struct dreplsrv_out_operation *prev, *next;
+
+       struct dreplsrv_service *service;
+
+       struct dreplsrv_partition_source_dsa *source_dsa;
+
+       struct composite_context *creq;
+};
+
 struct dreplsrv_service {
        /* the whole drepl service is in one task */
        struct task_server *task;
@@ -41,6 +126,14 @@ struct dreplsrv_service {
         */
        struct ldb_context *samdb;
 
+       /* the guid of our NTDS Settings object, which never changes! */
+       struct GUID ntds_guid;
+       /*
+        * the struct holds the values used for outgoing DsBind() calls,
+        * so that we need to set them up only once
+        */
+       struct drsuapi_DsBindInfo28 bind_info28;
+
        /* some stuff for periodic processing */
        struct {
                /*
@@ -58,8 +151,26 @@ struct dreplsrv_service {
                struct timed_event *te;
        } periodic;
 
+       /*
+        * the list of partitions we need to replicate
+        */
+       struct dreplsrv_partition *partitions;
+
+       /*
+        * the list of cached connections
+        */
+       struct dreplsrv_out_connection *connections;
+
+       struct {        
+               /* the pointer to the current active operation */
+               struct dreplsrv_out_operation *current;
+
+               /* the list of pending operations */
+               struct dreplsrv_out_operation *pending;
+       } ops;
 };
 
+#include "dsdb/repl/drepl_out_helpers.h"
 #include "dsdb/repl/drepl_service_proto.h"
 
 #endif /* _DSDB_REPL_DREPL_SERVICE_H_ */