s4-drs: mark WERR_DS_DRA_BUSY as a non error in DsReplicaUpdateRefs
[ira/wip.git] / source4 / dsdb / repl / drepl_out_helpers.c
1 /* 
2    Unix SMB/CIFS mplementation.
3    DSDB replication service helper function for outgoing traffic
4    
5    Copyright (C) Stefan Metzmacher 2007
6     
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19    
20 */
21
22 #include "includes.h"
23 #include "dsdb/samdb/samdb.h"
24 #include "auth/auth.h"
25 #include "smbd/service.h"
26 #include "lib/events/events.h"
27 #include "dsdb/repl/drepl_service.h"
28 #include <ldb_errors.h>
29 #include "../lib/util/dlinklist.h"
30 #include "librpc/gen_ndr/ndr_misc.h"
31 #include "librpc/gen_ndr/ndr_drsuapi.h"
32 #include "librpc/gen_ndr/ndr_drsblobs.h"
33 #include "libcli/composite/composite.h"
34 #include "auth/gensec/gensec.h"
35 #include "param/param.h"
36 #include "../lib/util/tevent_ntstatus.h"
37 #include "libcli/security/security.h"
38
39 struct dreplsrv_out_drsuapi_state {
40         struct tevent_context *ev;
41
42         struct dreplsrv_out_connection *conn;
43
44         struct dreplsrv_drsuapi_connection *drsuapi;
45
46         struct drsuapi_DsBindInfoCtr bind_info_ctr;
47         struct drsuapi_DsBind bind_r;
48 };
49
50 static void dreplsrv_out_drsuapi_connect_done(struct composite_context *creq);
51
52 struct tevent_req *dreplsrv_out_drsuapi_send(TALLOC_CTX *mem_ctx,
53                                              struct tevent_context *ev,
54                                              struct dreplsrv_out_connection *conn)
55 {
56         struct tevent_req *req;
57         struct dreplsrv_out_drsuapi_state *state;
58         struct composite_context *creq;
59
60         req = tevent_req_create(mem_ctx, &state,
61                                 struct dreplsrv_out_drsuapi_state);
62         if (req == NULL) {
63                 return NULL;
64         }
65
66         state->ev       = ev;
67         state->conn     = conn;
68         state->drsuapi  = conn->drsuapi;
69
70         if (state->drsuapi && !state->drsuapi->pipe->conn->dead) {
71                 tevent_req_done(req);
72                 return tevent_req_post(req, ev);
73         }
74
75         if (state->drsuapi && state->drsuapi->pipe->conn->dead) {
76                 talloc_free(state->drsuapi);
77                 conn->drsuapi = NULL;
78         }
79
80         state->drsuapi = talloc_zero(state, struct dreplsrv_drsuapi_connection);
81         if (tevent_req_nomem(state->drsuapi, req)) {
82                 return tevent_req_post(req, ev);
83         }
84
85         creq = dcerpc_pipe_connect_b_send(state, conn->binding, &ndr_table_drsuapi,
86                                           conn->service->system_session_info->credentials,
87                                           ev, conn->service->task->lp_ctx);
88         if (tevent_req_nomem(creq, req)) {
89                 return tevent_req_post(req, ev);
90         }
91         composite_continue(NULL, creq, dreplsrv_out_drsuapi_connect_done, req);
92
93         return req;
94 }
95
96 static void dreplsrv_out_drsuapi_bind_done(struct tevent_req *subreq);
97
98 static void dreplsrv_out_drsuapi_connect_done(struct composite_context *creq)
99 {
100         struct tevent_req *req = talloc_get_type(creq->async.private_data,
101                                                  struct tevent_req);
102         struct dreplsrv_out_drsuapi_state *state = tevent_req_data(req,
103                                                    struct dreplsrv_out_drsuapi_state);
104         NTSTATUS status;
105         struct tevent_req *subreq;
106
107         status = dcerpc_pipe_connect_b_recv(creq,
108                                             state->drsuapi,
109                                             &state->drsuapi->pipe);
110         if (tevent_req_nterror(req, status)) {
111                 return;
112         }
113
114         state->drsuapi->drsuapi_handle = state->drsuapi->pipe->binding_handle;
115
116         status = gensec_session_key(state->drsuapi->pipe->conn->security_state.generic_state,
117                                     state->drsuapi,
118                                     &state->drsuapi->gensec_skey);
119         if (tevent_req_nterror(req, status)) {
120                 return;
121         }
122
123         state->bind_info_ctr.length             = 28;
124         state->bind_info_ctr.info.info28        = state->conn->service->bind_info28;
125
126         state->bind_r.in.bind_guid = &state->conn->service->ntds_guid;
127         state->bind_r.in.bind_info = &state->bind_info_ctr;
128         state->bind_r.out.bind_handle = &state->drsuapi->bind_handle;
129
130         subreq = dcerpc_drsuapi_DsBind_r_send(state,
131                                               state->ev,
132                                               state->drsuapi->drsuapi_handle,
133                                               &state->bind_r);
134         if (tevent_req_nomem(subreq, req)) {
135                 return;
136         }
137         tevent_req_set_callback(subreq, dreplsrv_out_drsuapi_bind_done, req);
138 }
139
140 static void dreplsrv_out_drsuapi_bind_done(struct tevent_req *subreq)
141 {
142         struct tevent_req *req = tevent_req_callback_data(subreq,
143                                  struct tevent_req);
144         struct dreplsrv_out_drsuapi_state *state = tevent_req_data(req,
145                                                    struct dreplsrv_out_drsuapi_state);
146         NTSTATUS status;
147
148         status = dcerpc_drsuapi_DsBind_r_recv(subreq, state);
149         TALLOC_FREE(subreq);
150         if (tevent_req_nterror(req, status)) {
151                 return;
152         }
153
154         if (!W_ERROR_IS_OK(state->bind_r.out.result)) {
155                 status = werror_to_ntstatus(state->bind_r.out.result);
156                 tevent_req_nterror(req, status);
157                 return;
158         }
159
160         ZERO_STRUCT(state->drsuapi->remote_info28);
161         if (state->bind_r.out.bind_info) {
162                 struct drsuapi_DsBindInfo28 *info28;
163                 info28 = &state->drsuapi->remote_info28;
164
165                 switch (state->bind_r.out.bind_info->length) {
166                 case 24: {
167                         struct drsuapi_DsBindInfo24 *info24;
168                         info24 = &state->bind_r.out.bind_info->info.info24;
169
170                         info28->supported_extensions    = info24->supported_extensions;
171                         info28->site_guid               = info24->site_guid;
172                         info28->pid                     = info24->pid;
173                         info28->repl_epoch              = 0;
174                         break;
175                 }
176                 case 48: {
177                         struct drsuapi_DsBindInfo48 *info48;
178                         info48 = &state->bind_r.out.bind_info->info.info48;
179
180                         info28->supported_extensions    = info48->supported_extensions;
181                         info28->site_guid               = info48->site_guid;
182                         info28->pid                     = info48->pid;
183                         info28->repl_epoch              = info48->repl_epoch;
184                         break;
185                 }
186                 case 28:
187                         *info28 = state->bind_r.out.bind_info->info.info28;
188                         break;
189                 }
190         }
191
192         tevent_req_done(req);
193 }
194
195 NTSTATUS dreplsrv_out_drsuapi_recv(struct tevent_req *req)
196 {
197         struct dreplsrv_out_drsuapi_state *state = tevent_req_data(req,
198                                                    struct dreplsrv_out_drsuapi_state);
199         NTSTATUS status;
200
201         if (tevent_req_is_nterror(req, &status)) {
202                 tevent_req_received(req);
203                 return status;
204         }
205
206         state->conn->drsuapi = talloc_move(state->conn, &state->drsuapi);
207
208         tevent_req_received(req);
209         return NT_STATUS_OK;
210 }
211
212 struct dreplsrv_op_pull_source_state {
213         struct tevent_context *ev;
214         struct dreplsrv_out_operation *op;
215         void *ndr_struct_ptr;
216 };
217
218 static void dreplsrv_op_pull_source_connect_done(struct tevent_req *subreq);
219
220 struct tevent_req *dreplsrv_op_pull_source_send(TALLOC_CTX *mem_ctx,
221                                                 struct tevent_context *ev,
222                                                 struct dreplsrv_out_operation *op)
223 {
224         struct tevent_req *req;
225         struct dreplsrv_op_pull_source_state *state;
226         struct tevent_req *subreq;
227
228         req = tevent_req_create(mem_ctx, &state,
229                                 struct dreplsrv_op_pull_source_state);
230         if (req == NULL) {
231                 return NULL;
232         }
233         state->ev = ev;
234         state->op = op;
235
236         subreq = dreplsrv_out_drsuapi_send(state, ev, op->source_dsa->conn);
237         if (tevent_req_nomem(subreq, req)) {
238                 return tevent_req_post(req, ev);
239         }
240         tevent_req_set_callback(subreq, dreplsrv_op_pull_source_connect_done, req);
241
242         return req;
243 }
244
245 static void dreplsrv_op_pull_source_get_changes_trigger(struct tevent_req *req);
246
247 static void dreplsrv_op_pull_source_connect_done(struct tevent_req *subreq)
248 {
249         struct tevent_req *req = tevent_req_callback_data(subreq,
250                                  struct tevent_req);
251         NTSTATUS status;
252
253         status = dreplsrv_out_drsuapi_recv(subreq);
254         TALLOC_FREE(subreq);
255         if (tevent_req_nterror(req, status)) {
256                 return;
257         }
258
259         dreplsrv_op_pull_source_get_changes_trigger(req);
260 }
261
262 static void dreplsrv_op_pull_source_get_changes_done(struct tevent_req *subreq);
263
264 /*
265   get a RODC partial attribute set for a replication call
266  */
267 static NTSTATUS dreplsrv_get_rodc_partial_attribute_set(struct dreplsrv_service *service,
268                                                         TALLOC_CTX *mem_ctx,
269                                                         struct drsuapi_DsPartialAttributeSet **_pas,
270                                                         bool for_schema)
271 {
272         struct drsuapi_DsPartialAttributeSet *pas;
273         struct dsdb_schema *schema;
274         uint32_t i;
275
276         pas = talloc_zero(mem_ctx, struct drsuapi_DsPartialAttributeSet);
277         NT_STATUS_HAVE_NO_MEMORY(pas);
278
279         schema = dsdb_get_schema(service->samdb, NULL);
280
281         pas->version = 1;
282         pas->attids = talloc_array(pas, enum drsuapi_DsAttributeId, schema->num_attributes);
283         NT_STATUS_HAVE_NO_MEMORY_AND_FREE(pas->attids, pas);
284
285         for (i=0; i<schema->num_attributes; i++) {
286                 struct dsdb_attribute *a;
287                 a = schema->attributes_by_attributeID_id[i];
288                 if (a->systemFlags & (DS_FLAG_ATTR_NOT_REPLICATED | DS_FLAG_ATTR_IS_CONSTRUCTED)) {
289                         continue;
290                 }
291                 if (a->searchFlags & SEARCH_FLAG_RODC_ATTRIBUTE) {
292                         continue;
293                 }
294                 pas->attids[pas->num_attids] = dsdb_attribute_get_attid(a, for_schema);
295                 pas->num_attids++;
296         }
297
298         pas->attids = talloc_realloc(pas, pas->attids, enum drsuapi_DsAttributeId, pas->num_attids);
299         NT_STATUS_HAVE_NO_MEMORY_AND_FREE(pas->attids, pas);
300
301         *_pas = pas;
302         return NT_STATUS_OK;
303 }
304
305
306 /*
307   get a GC partial attribute set for a replication call
308  */
309 static NTSTATUS dreplsrv_get_gc_partial_attribute_set(struct dreplsrv_service *service,
310                                                       TALLOC_CTX *mem_ctx,
311                                                       struct drsuapi_DsPartialAttributeSet **_pas)
312 {
313         struct drsuapi_DsPartialAttributeSet *pas;
314         struct dsdb_schema *schema;
315         uint32_t i;
316
317         pas = talloc_zero(mem_ctx, struct drsuapi_DsPartialAttributeSet);
318         NT_STATUS_HAVE_NO_MEMORY(pas);
319
320         schema = dsdb_get_schema(service->samdb, NULL);
321
322         pas->version = 1;
323         pas->attids = talloc_array(pas, enum drsuapi_DsAttributeId, schema->num_attributes);
324         NT_STATUS_HAVE_NO_MEMORY_AND_FREE(pas->attids, pas);
325
326         for (i=0; i<schema->num_attributes; i++) {
327                 struct dsdb_attribute *a;
328                 a = schema->attributes_by_attributeID_id[i];
329                 if (a->isMemberOfPartialAttributeSet) {
330                         pas->attids[pas->num_attids] = dsdb_attribute_get_attid(a, false);
331                         pas->num_attids++;
332                 }
333         }
334
335         pas->attids = talloc_realloc(pas, pas->attids, enum drsuapi_DsAttributeId, pas->num_attids);
336         NT_STATUS_HAVE_NO_MEMORY_AND_FREE(pas->attids, pas);
337
338         *_pas = pas;
339         return NT_STATUS_OK;
340 }
341
342 /*
343   convert from one udv format to the other
344  */
345 static WERROR udv_convert(TALLOC_CTX *mem_ctx,
346                           const struct replUpToDateVectorCtr2 *udv,
347                           struct drsuapi_DsReplicaCursorCtrEx *udv_ex)
348 {
349         uint32_t i;
350
351         udv_ex->version = 2;
352         udv_ex->reserved1 = 0;
353         udv_ex->reserved2 = 0;
354         udv_ex->count = udv->count;
355         udv_ex->cursors = talloc_array(mem_ctx, struct drsuapi_DsReplicaCursor, udv->count);
356         W_ERROR_HAVE_NO_MEMORY(udv_ex->cursors);
357
358         for (i=0; i<udv->count; i++) {
359                 udv_ex->cursors[i].source_dsa_invocation_id = udv->cursors[i].source_dsa_invocation_id;
360                 udv_ex->cursors[i].highest_usn = udv->cursors[i].highest_usn;
361         }
362
363         return WERR_OK;
364 }
365
366
367 static void dreplsrv_op_pull_source_get_changes_trigger(struct tevent_req *req)
368 {
369         struct dreplsrv_op_pull_source_state *state = tevent_req_data(req,
370                                                       struct dreplsrv_op_pull_source_state);
371         struct repsFromTo1 *rf1 = state->op->source_dsa->repsFrom1;
372         struct dreplsrv_service *service = state->op->service;
373         struct dreplsrv_partition *partition = state->op->source_dsa->partition;
374         struct dreplsrv_drsuapi_connection *drsuapi = state->op->source_dsa->conn->drsuapi;
375         struct drsuapi_DsGetNCChanges *r;
376         struct drsuapi_DsReplicaCursorCtrEx *uptodateness_vector;
377         struct tevent_req *subreq;
378         struct drsuapi_DsPartialAttributeSet *pas = NULL;
379         NTSTATUS status;
380         uint32_t replica_flags;
381         struct drsuapi_DsReplicaHighWaterMark highwatermark;
382
383         r = talloc(state, struct drsuapi_DsGetNCChanges);
384         if (tevent_req_nomem(r, req)) {
385                 return;
386         }
387
388         r->out.level_out = talloc(r, uint32_t);
389         if (tevent_req_nomem(r->out.level_out, req)) {
390                 return;
391         }
392         r->in.req = talloc(r, union drsuapi_DsGetNCChangesRequest);
393         if (tevent_req_nomem(r->in.req, req)) {
394                 return;
395         }
396         r->out.ctr = talloc(r, union drsuapi_DsGetNCChangesCtr);
397         if (tevent_req_nomem(r->out.ctr, req)) {
398                 return;
399         }
400
401         if (partition->uptodatevector.count != 0 &&
402             partition->uptodatevector_ex.count == 0) {
403                 WERROR werr;
404                 werr = udv_convert(partition, &partition->uptodatevector, &partition->uptodatevector_ex);
405                 if (!W_ERROR_IS_OK(werr)) {
406                         DEBUG(0,(__location__ ": Failed to convert UDV for %s : %s\n",
407                                  ldb_dn_get_linearized(partition->dn), win_errstr(werr)));
408                 }
409         }
410
411         if (partition->uptodatevector_ex.count == 0) {
412                 uptodateness_vector = NULL;
413         } else {
414                 uptodateness_vector = &partition->uptodatevector_ex;
415         }
416
417         replica_flags = rf1->replica_flags;
418         highwatermark = rf1->highwatermark;
419
420         if (partition->partial_replica) {
421                 status = dreplsrv_get_gc_partial_attribute_set(service, r, &pas);
422                 if (!NT_STATUS_IS_OK(status)) {
423                         DEBUG(0,(__location__ ": Failed to construct GC partial attribute set : %s\n", nt_errstr(status)));
424                         return;
425                 }
426                 replica_flags &= ~DRSUAPI_DRS_WRIT_REP;
427         } else if (partition->rodc_replica) {
428                 bool for_schema = false;
429                 if (ldb_dn_compare_base(ldb_get_schema_basedn(service->samdb), partition->dn) == 0) {
430                         for_schema = true;
431                 }
432
433                 status = dreplsrv_get_rodc_partial_attribute_set(service, r, &pas, for_schema);
434                 if (!NT_STATUS_IS_OK(status)) {
435                         DEBUG(0,(__location__ ": Failed to construct RODC partial attribute set : %s\n", nt_errstr(status)));
436                         return;
437                 }
438                 if (state->op->extended_op == DRSUAPI_EXOP_REPL_SECRET) {
439                         replica_flags &= ~DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING;
440                 }
441         }
442
443         /* is this a full resync of all objects? */
444         if (state->op->options & DRSUAPI_DRS_FULL_SYNC_NOW) {
445                 ZERO_STRUCT(highwatermark);
446                 /* clear the FULL_SYNC_NOW option for subsequent
447                    stages of the replication cycle */
448                 state->op->options &= ~DRSUAPI_DRS_FULL_SYNC_NOW;
449                 state->op->options |= DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS;
450                 replica_flags |= DRSUAPI_DRS_NEVER_SYNCED;
451         }
452         if (state->op->options & DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS) {
453                 uptodateness_vector = NULL;
454         }
455
456         r->in.bind_handle       = &drsuapi->bind_handle;
457         if (drsuapi->remote_info28.supported_extensions & DRSUAPI_SUPPORTED_EXTENSION_GETCHGREQ_V8) {
458                 r->in.level                             = 8;
459                 r->in.req->req8.destination_dsa_guid    = service->ntds_guid;
460                 r->in.req->req8.source_dsa_invocation_id= rf1->source_dsa_invocation_id;
461                 r->in.req->req8.naming_context          = &partition->nc;
462                 r->in.req->req8.highwatermark           = highwatermark;
463                 r->in.req->req8.uptodateness_vector     = uptodateness_vector;
464                 r->in.req->req8.replica_flags           = replica_flags;
465                 r->in.req->req8.max_object_count        = 133;
466                 r->in.req->req8.max_ndr_size            = 1336811;
467                 r->in.req->req8.extended_op             = state->op->extended_op;
468                 r->in.req->req8.fsmo_info               = state->op->fsmo_info;
469                 r->in.req->req8.partial_attribute_set   = pas;
470                 r->in.req->req8.partial_attribute_set_ex= NULL;
471                 r->in.req->req8.mapping_ctr.num_mappings= 0;
472                 r->in.req->req8.mapping_ctr.mappings    = NULL;
473         } else {
474                 r->in.level                             = 5;
475                 r->in.req->req5.destination_dsa_guid    = service->ntds_guid;
476                 r->in.req->req5.source_dsa_invocation_id= rf1->source_dsa_invocation_id;
477                 r->in.req->req5.naming_context          = &partition->nc;
478                 r->in.req->req5.highwatermark           = highwatermark;
479                 r->in.req->req5.uptodateness_vector     = uptodateness_vector;
480                 r->in.req->req5.replica_flags           = replica_flags;
481                 r->in.req->req5.max_object_count        = 133;
482                 r->in.req->req5.max_ndr_size            = 1336770;
483                 r->in.req->req5.extended_op             = state->op->extended_op;
484                 r->in.req->req5.fsmo_info               = state->op->fsmo_info;
485         }
486
487 #if 0
488         NDR_PRINT_IN_DEBUG(drsuapi_DsGetNCChanges, r);
489 #endif
490
491         state->ndr_struct_ptr = r;
492         subreq = dcerpc_drsuapi_DsGetNCChanges_r_send(state,
493                                                       state->ev,
494                                                       drsuapi->drsuapi_handle,
495                                                       r);
496         if (tevent_req_nomem(subreq, req)) {
497                 return;
498         }
499         tevent_req_set_callback(subreq, dreplsrv_op_pull_source_get_changes_done, req);
500 }
501
502 static void dreplsrv_op_pull_source_apply_changes_trigger(struct tevent_req *req,
503                                                           struct drsuapi_DsGetNCChanges *r,
504                                                           uint32_t ctr_level,
505                                                           struct drsuapi_DsGetNCChangesCtr1 *ctr1,
506                                                           struct drsuapi_DsGetNCChangesCtr6 *ctr6);
507
508 static void dreplsrv_op_pull_source_get_changes_done(struct tevent_req *subreq)
509 {
510         struct tevent_req *req = tevent_req_callback_data(subreq,
511                                  struct tevent_req);
512         struct dreplsrv_op_pull_source_state *state = tevent_req_data(req,
513                                                       struct dreplsrv_op_pull_source_state);
514         NTSTATUS status;
515         struct drsuapi_DsGetNCChanges *r = talloc_get_type(state->ndr_struct_ptr,
516                                            struct drsuapi_DsGetNCChanges);
517         uint32_t ctr_level = 0;
518         struct drsuapi_DsGetNCChangesCtr1 *ctr1 = NULL;
519         struct drsuapi_DsGetNCChangesCtr6 *ctr6 = NULL;
520         enum drsuapi_DsExtendedError extended_ret;
521         state->ndr_struct_ptr = NULL;
522
523         status = dcerpc_drsuapi_DsGetNCChanges_r_recv(subreq, r);
524         TALLOC_FREE(subreq);
525         if (tevent_req_nterror(req, status)) {
526                 return;
527         }
528
529         if (!W_ERROR_IS_OK(r->out.result)) {
530                 status = werror_to_ntstatus(r->out.result);
531                 tevent_req_nterror(req, status);
532                 return;
533         }
534
535         if (*r->out.level_out == 1) {
536                 ctr_level = 1;
537                 ctr1 = &r->out.ctr->ctr1;
538         } else if (*r->out.level_out == 2 &&
539                    r->out.ctr->ctr2.mszip1.ts) {
540                 ctr_level = 1;
541                 ctr1 = &r->out.ctr->ctr2.mszip1.ts->ctr1;
542         } else if (*r->out.level_out == 6) {
543                 ctr_level = 6;
544                 ctr6 = &r->out.ctr->ctr6;
545         } else if (*r->out.level_out == 7 &&
546                    r->out.ctr->ctr7.level == 6 &&
547                    r->out.ctr->ctr7.type == DRSUAPI_COMPRESSION_TYPE_MSZIP &&
548                    r->out.ctr->ctr7.ctr.mszip6.ts) {
549                 ctr_level = 6;
550                 ctr6 = &r->out.ctr->ctr7.ctr.mszip6.ts->ctr6;
551         } else if (*r->out.level_out == 7 &&
552                    r->out.ctr->ctr7.level == 6 &&
553                    r->out.ctr->ctr7.type == DRSUAPI_COMPRESSION_TYPE_XPRESS &&
554                    r->out.ctr->ctr7.ctr.xpress6.ts) {
555                 ctr_level = 6;
556                 ctr6 = &r->out.ctr->ctr7.ctr.xpress6.ts->ctr6;
557         } else {
558                 status = werror_to_ntstatus(WERR_BAD_NET_RESP);
559                 tevent_req_nterror(req, status);
560                 return;
561         }
562
563         if (!ctr1 && !ctr6) {
564                 status = werror_to_ntstatus(WERR_BAD_NET_RESP);
565                 tevent_req_nterror(req, status);
566                 return;
567         }
568
569         if (ctr_level == 6) {
570                 if (!W_ERROR_IS_OK(ctr6->drs_error)) {
571                         status = werror_to_ntstatus(ctr6->drs_error);
572                         tevent_req_nterror(req, status);
573                         return;
574                 }
575                 extended_ret = ctr6->extended_ret;
576         }
577
578         if (ctr_level == 1) {
579                 extended_ret = ctr1->extended_ret;
580         }
581
582         if (state->op->extended_op != DRSUAPI_EXOP_NONE) {
583                 state->op->extended_ret = extended_ret;
584
585                 if (extended_ret != DRSUAPI_EXOP_ERR_SUCCESS) {
586                         status = NT_STATUS_UNSUCCESSFUL;
587                         tevent_req_nterror(req, status);
588                         return;
589                 }
590         }
591
592         dreplsrv_op_pull_source_apply_changes_trigger(req, r, ctr_level, ctr1, ctr6);
593 }
594
595 static void dreplsrv_update_refs_trigger(struct tevent_req *req);
596
597 static void dreplsrv_op_pull_source_apply_changes_trigger(struct tevent_req *req,
598                                                           struct drsuapi_DsGetNCChanges *r,
599                                                           uint32_t ctr_level,
600                                                           struct drsuapi_DsGetNCChangesCtr1 *ctr1,
601                                                            struct drsuapi_DsGetNCChangesCtr6 *ctr6)
602 {
603         struct dreplsrv_op_pull_source_state *state = tevent_req_data(req,
604                                                       struct dreplsrv_op_pull_source_state);
605         struct repsFromTo1 rf1 = *state->op->source_dsa->repsFrom1;
606         struct dreplsrv_service *service = state->op->service;
607         struct dreplsrv_partition *partition = state->op->source_dsa->partition;
608         struct dreplsrv_drsuapi_connection *drsuapi = state->op->source_dsa->conn->drsuapi;
609         struct dsdb_schema *schema;
610         struct dsdb_schema *working_schema = NULL;
611         const struct drsuapi_DsReplicaOIDMapping_Ctr *mapping_ctr;
612         uint32_t object_count;
613         struct drsuapi_DsReplicaObjectListItemEx *first_object;
614         uint32_t linked_attributes_count;
615         struct drsuapi_DsReplicaLinkedAttribute *linked_attributes;
616         const struct drsuapi_DsReplicaCursor2CtrEx *uptodateness_vector;
617         struct dsdb_extended_replicated_objects *objects;
618         bool more_data = false;
619         WERROR status;
620         NTSTATUS nt_status;
621         uint32_t dsdb_repl_flags = 0;
622
623         switch (ctr_level) {
624         case 1:
625                 mapping_ctr                     = &ctr1->mapping_ctr;
626                 object_count                    = ctr1->object_count;
627                 first_object                    = ctr1->first_object;
628                 linked_attributes_count         = 0;
629                 linked_attributes               = NULL;
630                 rf1.highwatermark               = ctr1->new_highwatermark;
631                 uptodateness_vector             = NULL; /* TODO: map it */
632                 more_data                       = ctr1->more_data;
633                 break;
634         case 6:
635                 mapping_ctr                     = &ctr6->mapping_ctr;
636                 object_count                    = ctr6->object_count;
637                 first_object                    = ctr6->first_object;
638                 linked_attributes_count         = ctr6->linked_attributes_count;
639                 linked_attributes               = ctr6->linked_attributes;
640                 rf1.highwatermark               = ctr6->new_highwatermark;
641                 uptodateness_vector             = ctr6->uptodateness_vector;
642                 more_data                       = ctr6->more_data;
643                 break;
644         default:
645                 nt_status = werror_to_ntstatus(WERR_BAD_NET_RESP);
646                 tevent_req_nterror(req, nt_status);
647                 return;
648         }
649
650         schema = dsdb_get_schema(service->samdb, NULL);
651         if (!schema) {
652                 DEBUG(0,(__location__ ": Schema is not loaded yet!\n"));
653                 tevent_req_nterror(req, NT_STATUS_INTERNAL_ERROR);
654                 return;
655         }
656
657         /*
658          * Decide what working schema to use for object conversion.
659          * We won't need a working schema for empty replicas sent.
660          */
661         if (first_object && ldb_dn_compare(partition->dn, schema->base_dn) == 0) {
662                 /* create working schema to convert objects with */
663                 status = dsdb_repl_make_working_schema(service->samdb,
664                                                        schema,
665                                                        mapping_ctr,
666                                                        object_count,
667                                                        first_object,
668                                                        &drsuapi->gensec_skey,
669                                                        state, &working_schema);
670                 if (!W_ERROR_IS_OK(status)) {
671                         DEBUG(0,("Failed to create working schema: %s\n",
672                                  win_errstr(status)));
673                         tevent_req_nterror(req, NT_STATUS_INTERNAL_ERROR);
674                         return;
675                 }
676         }
677
678         if (partition->partial_replica || partition->rodc_replica) {
679                 dsdb_repl_flags |= DSDB_REPL_FLAG_PARTIAL_REPLICA;
680         }
681         if (state->op->options & DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS) {
682                 dsdb_repl_flags |= DSDB_REPL_FLAG_PRIORITISE_INCOMING;
683         }
684
685         status = dsdb_replicated_objects_convert(service->samdb,
686                                                  working_schema ? working_schema : schema,
687                                                  partition->nc.dn,
688                                                  mapping_ctr,
689                                                  object_count,
690                                                  first_object,
691                                                  linked_attributes_count,
692                                                  linked_attributes,
693                                                  &rf1,
694                                                  uptodateness_vector,
695                                                  &drsuapi->gensec_skey,
696                                                  dsdb_repl_flags,
697                                                  state, &objects);
698         if (!W_ERROR_IS_OK(status)) {
699                 nt_status = werror_to_ntstatus(WERR_BAD_NET_RESP);
700                 DEBUG(0,("Failed to convert objects: %s/%s\n",
701                           win_errstr(status), nt_errstr(nt_status)));
702                 tevent_req_nterror(req, nt_status);
703                 return;
704         }
705
706         status = dsdb_replicated_objects_commit(service->samdb,
707                                                 working_schema,
708                                                 objects,
709                                                 &state->op->source_dsa->notify_uSN);
710         talloc_free(objects);
711         if (!W_ERROR_IS_OK(status)) {
712                 nt_status = werror_to_ntstatus(WERR_BAD_NET_RESP);
713                 DEBUG(0,("Failed to commit objects: %s/%s\n",
714                           win_errstr(status), nt_errstr(nt_status)));
715                 tevent_req_nterror(req, nt_status);
716                 return;
717         }
718
719         if (state->op->extended_op == DRSUAPI_EXOP_NONE) {
720                 /* if it applied fine, we need to update the highwatermark */
721                 *state->op->source_dsa->repsFrom1 = rf1;
722         }
723
724         /* we don't need this maybe very large structure anymore */
725         TALLOC_FREE(r);
726
727         if (more_data) {
728                 dreplsrv_op_pull_source_get_changes_trigger(req);
729                 return;
730         }
731
732         if (state->op->extended_op != DRSUAPI_EXOP_NONE ||
733             state->op->service->am_rodc) {
734                 /*
735                   we don't do the UpdateRefs for extended ops or if we
736                   are a RODC
737                  */
738                 tevent_req_done(req);
739                 return;
740         }
741
742         /* now we need to update the repsTo record for this partition
743            on the server. These records are initially established when
744            we join the domain, but they quickly expire.  We do it here
745            so we can use the already established DRSUAPI pipe
746         */
747         dreplsrv_update_refs_trigger(req);
748 }
749
750 static void dreplsrv_update_refs_done(struct tevent_req *subreq);
751
752 /*
753   send a UpdateRefs request to refresh our repsTo record on the server
754  */
755 static void dreplsrv_update_refs_trigger(struct tevent_req *req)
756 {
757         struct dreplsrv_op_pull_source_state *state = tevent_req_data(req,
758                                                       struct dreplsrv_op_pull_source_state);
759         struct dreplsrv_service *service = state->op->service;
760         struct dreplsrv_partition *partition = state->op->source_dsa->partition;
761         struct dreplsrv_drsuapi_connection *drsuapi = state->op->source_dsa->conn->drsuapi;
762         struct drsuapi_DsReplicaUpdateRefs *r;
763         char *ntds_dns_name;
764         struct tevent_req *subreq;
765
766         r = talloc(state, struct drsuapi_DsReplicaUpdateRefs);
767         if (tevent_req_nomem(r, req)) {
768                 return;
769         }
770
771         ntds_dns_name = samdb_ntds_msdcs_dns_name(service->samdb, r, &service->ntds_guid);
772         if (tevent_req_nomem(ntds_dns_name, req)) {
773                 talloc_free(r);
774                 return;
775         }
776
777         r->in.bind_handle       = &drsuapi->bind_handle;
778         r->in.level             = 1;
779         r->in.req.req1.naming_context     = &partition->nc;
780         r->in.req.req1.dest_dsa_dns_name  = ntds_dns_name;
781         r->in.req.req1.dest_dsa_guid      = service->ntds_guid;
782         r->in.req.req1.options            = DRSUAPI_DRS_ADD_REF | DRSUAPI_DRS_DEL_REF;
783         if (!service->am_rodc) {
784                 r->in.req.req1.options |= DRSUAPI_DRS_WRIT_REP;
785         }
786
787         state->ndr_struct_ptr = r;
788         subreq = dcerpc_drsuapi_DsReplicaUpdateRefs_r_send(state,
789                                                            state->ev,
790                                                            drsuapi->drsuapi_handle,
791                                                            r);
792         if (tevent_req_nomem(subreq, req)) {
793                 talloc_free(r);
794                 return;
795         }
796         tevent_req_set_callback(subreq, dreplsrv_update_refs_done, req);
797 }
798
799 /*
800   receive a UpdateRefs reply
801  */
802 static void dreplsrv_update_refs_done(struct tevent_req *subreq)
803 {
804         struct tevent_req *req = tevent_req_callback_data(subreq,
805                                  struct tevent_req);
806         struct dreplsrv_op_pull_source_state *state = tevent_req_data(req,
807                                                       struct dreplsrv_op_pull_source_state);
808         struct drsuapi_DsReplicaUpdateRefs *r = talloc_get_type(state->ndr_struct_ptr,
809                                                                 struct drsuapi_DsReplicaUpdateRefs);
810         NTSTATUS status;
811
812         state->ndr_struct_ptr = NULL;
813
814         status = dcerpc_drsuapi_DsReplicaUpdateRefs_r_recv(subreq, r);
815         TALLOC_FREE(subreq);
816         if (!NT_STATUS_IS_OK(status)) {
817                 DEBUG(0,("UpdateRefs failed with %s\n", 
818                          nt_errstr(status)));
819                 tevent_req_nterror(req, status);
820                 return;
821         }
822
823         if (!W_ERROR_IS_OK(r->out.result)) {
824                 status = werror_to_ntstatus(r->out.result);
825                 DEBUG(0,("UpdateRefs failed with %s/%s for %s %s\n",
826                          win_errstr(r->out.result),
827                          nt_errstr(status),
828                          r->in.req.req1.dest_dsa_dns_name,
829                          r->in.req.req1.naming_context->dn));
830                 /*
831                  * TODO we are currently not sending the
832                  * DsReplicaUpdateRefs at the correct moment,
833                  * we do it just after a GetNcChanges which is
834                  * not always correct.
835                  * Especially when another DC is trying to demote
836                  * it will sends us a DsReplicaSync that will trigger a getNcChanges
837                  * this call will succeed but the DsRecplicaUpdateRefs that we send
838                  * just after will not because the DC is in a demote state and
839                  * will reply us a WERR_DS_DRA_BUSY, this error will cause us to
840                  * answer to the DsReplicaSync with a non OK status, the other DC
841                  * will stop the demote due to this error.
842                  * In order to cope with this we will for the moment concider
843                  * a DS_DRA_BUSY not as an error.
844                  * It's not ideal but it should not have a too huge impact for
845                  * running production as this error otherwise never happen and
846                  * due to the fact the send a DsReplicaUpdateRefs after each getNcChanges
847                  */
848                 if (!W_ERROR_EQUAL(werr, WERR_DS_DRA_BUSY)) {
849                         tevent_req_nterror(req, status);
850                         return;
851                 }
852         }
853
854         DEBUG(4,("UpdateRefs OK for %s %s\n", 
855                  r->in.req.req1.dest_dsa_dns_name,
856                  r->in.req.req1.naming_context->dn));
857
858         tevent_req_done(req);
859 }
860
861 WERROR dreplsrv_op_pull_source_recv(struct tevent_req *req)
862 {
863         NTSTATUS status;
864
865         if (tevent_req_is_nterror(req, &status)) {
866                 tevent_req_received(req);
867                 return ntstatus_to_werror(status);
868         }
869
870         tevent_req_received(req);
871         return WERR_OK;
872 }
873