getncchanges.c: Send linked attributes in each chunk
[nivanova/samba-autobuild/.git] / source4 / rpc_server / drsuapi / getncchanges.c
index 94b7d1d5294c3838d16df32b329dc0b2c9017c0a..cd3f51fc2b0135b69b2c792fd87f3081420be63d 100644 (file)
@@ -68,8 +68,11 @@ struct drsuapi_getncchanges_state {
        struct drsuapi_DsReplicaCursor2CtrEx *final_udv;
        struct drsuapi_DsReplicaLinkedAttribute *la_list;
        uint32_t la_count;
-       struct la_for_sorting *la_sorted;
        uint32_t la_idx;
+
+       /* these are just used for debugging the replication's progress */
+       uint32_t links_given;
+       uint32_t total_links;
 };
 
 /* We must keep the GUIDs in NDR form for sorting */
@@ -2369,8 +2372,6 @@ WERROR dcesrv_drsuapi_DsGetNCChanges(struct dcesrv_call_state *dce_call, TALLOC_
        uint32_t max_objects;
        uint32_t max_links;
        uint32_t link_count = 0;
-       uint32_t link_total = 0;
-       uint32_t link_given = 0;
        struct ldb_dn *search_dn = NULL;
        bool am_rodc;
        enum security_user_level security_level;
@@ -2389,6 +2390,7 @@ WERROR dcesrv_drsuapi_DsGetNCChanges(struct dcesrv_call_state *dce_call, TALLOC_
        bool full = true;
        uint32_t *local_pas = NULL;
        struct ldb_dn *machine_dn = NULL; /* Only used for REPL SECRET EXOP */
+       bool immediate_link_sync;
 
        DCESRV_PULL_HANDLE_WERR(h, r->in.bind_handle, DRSUAPI_BIND_HANDLE);
        b_state = h->data;
@@ -2906,6 +2908,9 @@ allowed:
         */
        max_links = lpcfg_parm_int(dce_call->conn->dce_ctx->lp_ctx, NULL, "drs", "max link sync", 1500);
 
+       immediate_link_sync = lpcfg_parm_bool(dce_call->conn->dce_ctx->lp_ctx, NULL,
+                                             "drs", "immediate link sync", false);
+
        /*
         * Maximum time that we can spend in a getncchanges
         * in order to avoid timeout of the other part.
@@ -2954,6 +2959,7 @@ allowed:
                struct ldb_dn *msg_dn;
                bool obj_already_sent = false;
                TALLOC_CTX *tmp_ctx = talloc_new(mem_ctx);
+               uint32_t old_la_index;
 
                msg_dn = ldb_dn_new_fmt(tmp_ctx, sam_ctx, "<GUID=%s>",
                                        GUID_string(tmp_ctx, &getnc_state->guids[i]));
@@ -3023,6 +3029,8 @@ allowed:
                        }
                }
 
+               old_la_index = getnc_state->la_count;
+
                /*
                 * We've reached the USN where this object naturally occurs.
                 * Regardless of whether we've already sent the object (as an
@@ -3060,11 +3068,17 @@ allowed:
                         new_objs ? "replicating" : "skipping send of",
                         ldb_dn_get_linearized(msg->dn)));
 
+               getnc_state->total_links += (getnc_state->la_count - old_la_index);
+
                TALLOC_FREE(tmp_ctx);
        }
 
        getnc_state->num_processed = i;
 
+       if (i < getnc_state->num_records) {
+               r->out.ctr->ctr6.more_data = true;
+       }
+
        /* the client can us to call UpdateRefs on its behalf to
           re-establish monitoring of the NC */
        if ((req10->replica_flags & (DRSUAPI_DRS_ADD_REF | DRSUAPI_DRS_REF_GCSPN)) &&
@@ -3110,25 +3124,32 @@ allowed:
                max_links -= r->out.ctr->ctr6.object_count;
        }
 
-       link_total = getnc_state->la_count;
-
-       if (i < getnc_state->num_records) {
-               r->out.ctr->ctr6.more_data = true;
-       } else {
-               /* sort the whole array the first time */
-               if (getnc_state->la_sorted == NULL) {
-                       werr = getncchanges_get_sorted_array(getnc_state->la_list,
-                                                            getnc_state->la_count,
-                                                            sam_ctx, getnc_state,
-                                                            schema,
-                                                            &getnc_state->la_sorted);
-                       if (!W_ERROR_IS_OK(werr)) {
-                               return werr;
-                       }
-               }
-
+       /*
+        * Work out how many links we can send in this chunk. The default is to
+        * send all the links last, but there is a config option to send them
+        * immediately, in the same chunk as their source object
+        */
+       if (!r->out.ctr->ctr6.more_data || immediate_link_sync) {
                link_count = getnc_state->la_count - getnc_state->la_idx;
                link_count = MIN(max_links, link_count);
+       }
+
+       /* If we've got linked attributes to send, add them now */
+       if (link_count > 0) {
+               struct la_for_sorting *la_sorted;
+
+               /*
+                * Grab a chunk of linked attributes off the list and put them
+                * in sorted array, ready to send
+                */
+               werr = getncchanges_get_sorted_array(&getnc_state->la_list[getnc_state->la_idx],
+                                                    link_count,
+                                                    sam_ctx, getnc_state,
+                                                    schema,
+                                                    &la_sorted);
+               if (!W_ERROR_IS_OK(werr)) {
+                       return werr;
+               }
 
                r->out.ctr->ctr6.linked_attributes_count = link_count;
                r->out.ctr->ctr6.linked_attributes = talloc_array(r->out.ctr, struct drsuapi_DsReplicaLinkedAttribute, link_count);
@@ -3138,16 +3159,29 @@ allowed:
                }
 
                for (k = 0; k < link_count; k++) {
-                       r->out.ctr->ctr6.linked_attributes[k]
-                               = *getnc_state->la_sorted[getnc_state->la_idx + k].link;
+                       r->out.ctr->ctr6.linked_attributes[k] = *la_sorted[k].link;
                }
 
                getnc_state->la_idx += link_count;
-               link_given = getnc_state->la_idx;
+               getnc_state->links_given += link_count;
 
                if (getnc_state->la_idx < getnc_state->la_count) {
                        r->out.ctr->ctr6.more_data = true;
+               } else {
+
+                       /*
+                        * We've now sent all the links seen so far, so we can
+                        * reset la_list back to an empty list again. Note that
+                        * the steal means the linked attribute memory gets
+                        * freed after this RPC message is sent on the wire.
+                        */
+                       talloc_steal(mem_ctx, getnc_state->la_list);
+                       getnc_state->la_list = NULL;
+                       getnc_state->la_idx = 0;
+                       getnc_state->la_count = 0;
                }
+
+               TALLOC_FREE(la_sorted);
        }
 
        if (req10->replica_flags & DRSUAPI_DRS_GET_NC_SIZE) {
@@ -3163,17 +3197,23 @@ allowed:
                 * of links we found so far during the cycle.
                 */
                r->out.ctr->ctr6.nc_object_count = getnc_state->num_records;
-               r->out.ctr->ctr6.nc_linked_attributes_count = getnc_state->la_count;
+               r->out.ctr->ctr6.nc_linked_attributes_count = getnc_state->total_links;
        }
 
        if (!r->out.ctr->ctr6.more_data) {
-               talloc_steal(mem_ctx, getnc_state->la_list);
 
+               /* this is the last response in the replication cycle */
                r->out.ctr->ctr6.new_highwatermark = getnc_state->final_hwm;
                r->out.ctr->ctr6.uptodateness_vector = talloc_move(mem_ctx,
                                                        &getnc_state->final_udv);
 
-               talloc_free(getnc_state);
+               /*
+                * Free the state info stored for the replication cycle. Note
+                * that the RPC message we're sending contains links stored in
+                * getnc_state. mem_ctx is local to this RPC call, so the memory
+                * will get freed after the RPC message is sent on the wire.
+                */
+               talloc_steal(mem_ctx, getnc_state);
                b_state->getncchanges_state = NULL;
        } else {
                ret = drsuapi_DsReplicaHighWaterMark_cmp(&r->out.ctr->ctr6.old_highwatermark,
@@ -3209,7 +3249,7 @@ allowed:
               r->out.ctr->ctr6.object_count,
               i, r->out.ctr->ctr6.more_data?getnc_state->num_records:i,
               r->out.ctr->ctr6.linked_attributes_count,
-              link_given, link_total,
+              getnc_state->links_given, getnc_state->total_links,
               dom_sid_string(mem_ctx, user_sid)));
 
 #if 0