getncchanges.c: Send linked attributes in each chunk
authorTim Beale <timbeale@catalyst.net.nz>
Tue, 6 Jun 2017 22:46:47 +0000 (10:46 +1200)
committerDouglas Bagnall <dbagnall@samba.org>
Fri, 15 Sep 2017 08:07:33 +0000 (10:07 +0200)
Instead of sending all the linked attributes at the end, add a
configurable option to send the links in each replication chunk.

The benefits of this approach are:
- it can reduce memory overhead, as we don't have to keep all the links
in memory over the entire replication cycle.
- the client should never end up knowing about objects but not their
links. (Although we're not sure that this has actually resulted in
replication problems, i.e. missing links).

Note that until we support GET_TGT, this approach can mean we now send
a link where the client doesn't know about the target object, causing
the client to siliently drop that linked attribute. Hence, this option
is switched off by default.

Implementation-wise, this code works fairly the same as before. Instead
of sorting the entire getnc_state->la_sorted array at the end and then
splitting it up over chunks, we now split the links up over chunks and
then sort them when we copy them into the message. This should be OK, as
I believe the MS-DRSR Doc says the links in the message should be sorted
(rather than sorting *all* the links overall). Windows behaviour seems
to chunk the links based on USN and then sort them.

getnc_state->la_idx now tracks which links in getnc_state->la_list[]
have already been sent (instead of tracking getnc_state->la_sorted).
This means the la_sorted array no longer needs to be stored in
getnc_state and we can free the array's memory once we've copied the
links into the message. Unfortunately, the link_given/link_total debug
no longer reports the correct information, so I've moved these into
getncchanges_state struct (and now free the struct a bit later so it's
safe to reference in the debug).

The vampire_dc testenv has been updated to use this new behaviour.

Signed-off-by: Tim Beale <timbeale@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Autobuild-User(master): Douglas Bagnall <dbagnall@samba.org>
Autobuild-Date(master): Fri Sep 15 10:07:33 CEST 2017 on sn-devel-144

selftest/target/Samba4.pm
source4/rpc_server/drsuapi/getncchanges.c

index 39a64ae3d81bce2ce0b2ccd1008e534cc8883424..f0f704237c6d37569deb02fc411c0351739444a5 100755 (executable)
@@ -1288,9 +1288,12 @@ sub provision_vampire_dc($$$)
        my ($self, $prefix, $dcvars, $fl) = @_;
        print "PROVISIONING VAMPIRE DC @ FL $fl...\n";
        my $name = "localvampiredc";
        my ($self, $prefix, $dcvars, $fl) = @_;
        print "PROVISIONING VAMPIRE DC @ FL $fl...\n";
        my $name = "localvampiredc";
+       my $extra_conf = "";
 
        if ($fl == "2000") {
 
        if ($fl == "2000") {
-           $name = "vampire2000dc";
+               $name = "vampire2000dc";
+       } else {
+               $extra_conf = "drs: immediate link sync = yes";
        }
 
        # We do this so that we don't run the provision.  That's the job of 'net vampire'.
        }
 
        # We do this so that we don't run the provision.  That's the job of 'net vampire'.
@@ -1310,6 +1313,7 @@ sub provision_vampire_dc($$$)
        server max protocol = SMB2
 
         ntlm auth = mschapv2-and-ntlmv2-only
        server max protocol = SMB2
 
         ntlm auth = mschapv2-and-ntlmv2-only
+       $extra_conf
 
 [sysvol]
        path = $ctx->{statedir}/sysvol
 
 [sysvol]
        path = $ctx->{statedir}/sysvol
index 94b7d1d5294c3838d16df32b329dc0b2c9017c0a..cd3f51fc2b0135b69b2c792fd87f3081420be63d 100644 (file)
@@ -68,8 +68,11 @@ struct drsuapi_getncchanges_state {
        struct drsuapi_DsReplicaCursor2CtrEx *final_udv;
        struct drsuapi_DsReplicaLinkedAttribute *la_list;
        uint32_t la_count;
        struct drsuapi_DsReplicaCursor2CtrEx *final_udv;
        struct drsuapi_DsReplicaLinkedAttribute *la_list;
        uint32_t la_count;
-       struct la_for_sorting *la_sorted;
        uint32_t la_idx;
        uint32_t la_idx;
+
+       /* these are just used for debugging the replication's progress */
+       uint32_t links_given;
+       uint32_t total_links;
 };
 
 /* We must keep the GUIDs in NDR form for sorting */
 };
 
 /* We must keep the GUIDs in NDR form for sorting */
@@ -2369,8 +2372,6 @@ WERROR dcesrv_drsuapi_DsGetNCChanges(struct dcesrv_call_state *dce_call, TALLOC_
        uint32_t max_objects;
        uint32_t max_links;
        uint32_t link_count = 0;
        uint32_t max_objects;
        uint32_t max_links;
        uint32_t link_count = 0;
-       uint32_t link_total = 0;
-       uint32_t link_given = 0;
        struct ldb_dn *search_dn = NULL;
        bool am_rodc;
        enum security_user_level security_level;
        struct ldb_dn *search_dn = NULL;
        bool am_rodc;
        enum security_user_level security_level;
@@ -2389,6 +2390,7 @@ WERROR dcesrv_drsuapi_DsGetNCChanges(struct dcesrv_call_state *dce_call, TALLOC_
        bool full = true;
        uint32_t *local_pas = NULL;
        struct ldb_dn *machine_dn = NULL; /* Only used for REPL SECRET EXOP */
        bool full = true;
        uint32_t *local_pas = NULL;
        struct ldb_dn *machine_dn = NULL; /* Only used for REPL SECRET EXOP */
+       bool immediate_link_sync;
 
        DCESRV_PULL_HANDLE_WERR(h, r->in.bind_handle, DRSUAPI_BIND_HANDLE);
        b_state = h->data;
 
        DCESRV_PULL_HANDLE_WERR(h, r->in.bind_handle, DRSUAPI_BIND_HANDLE);
        b_state = h->data;
@@ -2906,6 +2908,9 @@ allowed:
         */
        max_links = lpcfg_parm_int(dce_call->conn->dce_ctx->lp_ctx, NULL, "drs", "max link sync", 1500);
 
         */
        max_links = lpcfg_parm_int(dce_call->conn->dce_ctx->lp_ctx, NULL, "drs", "max link sync", 1500);
 
+       immediate_link_sync = lpcfg_parm_bool(dce_call->conn->dce_ctx->lp_ctx, NULL,
+                                             "drs", "immediate link sync", false);
+
        /*
         * Maximum time that we can spend in a getncchanges
         * in order to avoid timeout of the other part.
        /*
         * Maximum time that we can spend in a getncchanges
         * in order to avoid timeout of the other part.
@@ -2954,6 +2959,7 @@ allowed:
                struct ldb_dn *msg_dn;
                bool obj_already_sent = false;
                TALLOC_CTX *tmp_ctx = talloc_new(mem_ctx);
                struct ldb_dn *msg_dn;
                bool obj_already_sent = false;
                TALLOC_CTX *tmp_ctx = talloc_new(mem_ctx);
+               uint32_t old_la_index;
 
                msg_dn = ldb_dn_new_fmt(tmp_ctx, sam_ctx, "<GUID=%s>",
                                        GUID_string(tmp_ctx, &getnc_state->guids[i]));
 
                msg_dn = ldb_dn_new_fmt(tmp_ctx, sam_ctx, "<GUID=%s>",
                                        GUID_string(tmp_ctx, &getnc_state->guids[i]));
@@ -3023,6 +3029,8 @@ allowed:
                        }
                }
 
                        }
                }
 
+               old_la_index = getnc_state->la_count;
+
                /*
                 * We've reached the USN where this object naturally occurs.
                 * Regardless of whether we've already sent the object (as an
                /*
                 * We've reached the USN where this object naturally occurs.
                 * Regardless of whether we've already sent the object (as an
@@ -3060,11 +3068,17 @@ allowed:
                         new_objs ? "replicating" : "skipping send of",
                         ldb_dn_get_linearized(msg->dn)));
 
                         new_objs ? "replicating" : "skipping send of",
                         ldb_dn_get_linearized(msg->dn)));
 
+               getnc_state->total_links += (getnc_state->la_count - old_la_index);
+
                TALLOC_FREE(tmp_ctx);
        }
 
        getnc_state->num_processed = i;
 
                TALLOC_FREE(tmp_ctx);
        }
 
        getnc_state->num_processed = i;
 
+       if (i < getnc_state->num_records) {
+               r->out.ctr->ctr6.more_data = true;
+       }
+
        /* the client can us to call UpdateRefs on its behalf to
           re-establish monitoring of the NC */
        if ((req10->replica_flags & (DRSUAPI_DRS_ADD_REF | DRSUAPI_DRS_REF_GCSPN)) &&
        /* the client can us to call UpdateRefs on its behalf to
           re-establish monitoring of the NC */
        if ((req10->replica_flags & (DRSUAPI_DRS_ADD_REF | DRSUAPI_DRS_REF_GCSPN)) &&
@@ -3110,25 +3124,32 @@ allowed:
                max_links -= r->out.ctr->ctr6.object_count;
        }
 
                max_links -= r->out.ctr->ctr6.object_count;
        }
 
-       link_total = getnc_state->la_count;
-
-       if (i < getnc_state->num_records) {
-               r->out.ctr->ctr6.more_data = true;
-       } else {
-               /* sort the whole array the first time */
-               if (getnc_state->la_sorted == NULL) {
-                       werr = getncchanges_get_sorted_array(getnc_state->la_list,
-                                                            getnc_state->la_count,
-                                                            sam_ctx, getnc_state,
-                                                            schema,
-                                                            &getnc_state->la_sorted);
-                       if (!W_ERROR_IS_OK(werr)) {
-                               return werr;
-                       }
-               }
-
+       /*
+        * Work out how many links we can send in this chunk. The default is to
+        * send all the links last, but there is a config option to send them
+        * immediately, in the same chunk as their source object
+        */
+       if (!r->out.ctr->ctr6.more_data || immediate_link_sync) {
                link_count = getnc_state->la_count - getnc_state->la_idx;
                link_count = MIN(max_links, link_count);
                link_count = getnc_state->la_count - getnc_state->la_idx;
                link_count = MIN(max_links, link_count);
+       }
+
+       /* If we've got linked attributes to send, add them now */
+       if (link_count > 0) {
+               struct la_for_sorting *la_sorted;
+
+               /*
+                * Grab a chunk of linked attributes off the list and put them
+                * in sorted array, ready to send
+                */
+               werr = getncchanges_get_sorted_array(&getnc_state->la_list[getnc_state->la_idx],
+                                                    link_count,
+                                                    sam_ctx, getnc_state,
+                                                    schema,
+                                                    &la_sorted);
+               if (!W_ERROR_IS_OK(werr)) {
+                       return werr;
+               }
 
                r->out.ctr->ctr6.linked_attributes_count = link_count;
                r->out.ctr->ctr6.linked_attributes = talloc_array(r->out.ctr, struct drsuapi_DsReplicaLinkedAttribute, link_count);
 
                r->out.ctr->ctr6.linked_attributes_count = link_count;
                r->out.ctr->ctr6.linked_attributes = talloc_array(r->out.ctr, struct drsuapi_DsReplicaLinkedAttribute, link_count);
@@ -3138,16 +3159,29 @@ allowed:
                }
 
                for (k = 0; k < link_count; k++) {
                }
 
                for (k = 0; k < link_count; k++) {
-                       r->out.ctr->ctr6.linked_attributes[k]
-                               = *getnc_state->la_sorted[getnc_state->la_idx + k].link;
+                       r->out.ctr->ctr6.linked_attributes[k] = *la_sorted[k].link;
                }
 
                getnc_state->la_idx += link_count;
                }
 
                getnc_state->la_idx += link_count;
-               link_given = getnc_state->la_idx;
+               getnc_state->links_given += link_count;
 
                if (getnc_state->la_idx < getnc_state->la_count) {
                        r->out.ctr->ctr6.more_data = true;
 
                if (getnc_state->la_idx < getnc_state->la_count) {
                        r->out.ctr->ctr6.more_data = true;
+               } else {
+
+                       /*
+                        * We've now sent all the links seen so far, so we can
+                        * reset la_list back to an empty list again. Note that
+                        * the steal means the linked attribute memory gets
+                        * freed after this RPC message is sent on the wire.
+                        */
+                       talloc_steal(mem_ctx, getnc_state->la_list);
+                       getnc_state->la_list = NULL;
+                       getnc_state->la_idx = 0;
+                       getnc_state->la_count = 0;
                }
                }
+
+               TALLOC_FREE(la_sorted);
        }
 
        if (req10->replica_flags & DRSUAPI_DRS_GET_NC_SIZE) {
        }
 
        if (req10->replica_flags & DRSUAPI_DRS_GET_NC_SIZE) {
@@ -3163,17 +3197,23 @@ allowed:
                 * of links we found so far during the cycle.
                 */
                r->out.ctr->ctr6.nc_object_count = getnc_state->num_records;
                 * of links we found so far during the cycle.
                 */
                r->out.ctr->ctr6.nc_object_count = getnc_state->num_records;
-               r->out.ctr->ctr6.nc_linked_attributes_count = getnc_state->la_count;
+               r->out.ctr->ctr6.nc_linked_attributes_count = getnc_state->total_links;
        }
 
        if (!r->out.ctr->ctr6.more_data) {
        }
 
        if (!r->out.ctr->ctr6.more_data) {
-               talloc_steal(mem_ctx, getnc_state->la_list);
 
 
+               /* this is the last response in the replication cycle */
                r->out.ctr->ctr6.new_highwatermark = getnc_state->final_hwm;
                r->out.ctr->ctr6.uptodateness_vector = talloc_move(mem_ctx,
                                                        &getnc_state->final_udv);
 
                r->out.ctr->ctr6.new_highwatermark = getnc_state->final_hwm;
                r->out.ctr->ctr6.uptodateness_vector = talloc_move(mem_ctx,
                                                        &getnc_state->final_udv);
 
-               talloc_free(getnc_state);
+               /*
+                * Free the state info stored for the replication cycle. Note
+                * that the RPC message we're sending contains links stored in
+                * getnc_state. mem_ctx is local to this RPC call, so the memory
+                * will get freed after the RPC message is sent on the wire.
+                */
+               talloc_steal(mem_ctx, getnc_state);
                b_state->getncchanges_state = NULL;
        } else {
                ret = drsuapi_DsReplicaHighWaterMark_cmp(&r->out.ctr->ctr6.old_highwatermark,
                b_state->getncchanges_state = NULL;
        } else {
                ret = drsuapi_DsReplicaHighWaterMark_cmp(&r->out.ctr->ctr6.old_highwatermark,
@@ -3209,7 +3249,7 @@ allowed:
               r->out.ctr->ctr6.object_count,
               i, r->out.ctr->ctr6.more_data?getnc_state->num_records:i,
               r->out.ctr->ctr6.linked_attributes_count,
               r->out.ctr->ctr6.object_count,
               i, r->out.ctr->ctr6.more_data?getnc_state->num_records:i,
               r->out.ctr->ctr6.linked_attributes_count,
-              link_given, link_total,
+              getnc_state->links_given, getnc_state->total_links,
               dom_sid_string(mem_ctx, user_sid)));
 
 #if 0
               dom_sid_string(mem_ctx, user_sid)));
 
 #if 0