s4:dsdb/repl Split the 'convert' or 'commit' stages in the DRS import
[samba.git] / source4 / dsdb / repl / drepl_out_helpers.c
1 /* 
2    Unix SMB/CIFS mplementation.
3    DSDB replication service helper function for outgoing traffic
4    
5    Copyright (C) Stefan Metzmacher 2007
6     
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19    
20 */
21
22 #include "includes.h"
23 #include "dsdb/samdb/samdb.h"
24 #include "auth/auth.h"
25 #include "smbd/service.h"
26 #include "lib/events/events.h"
27 #include "lib/messaging/irpc.h"
28 #include "dsdb/repl/drepl_service.h"
29 #include "lib/ldb/include/ldb_errors.h"
30 #include "../lib/util/dlinklist.h"
31 #include "librpc/gen_ndr/ndr_misc.h"
32 #include "librpc/gen_ndr/ndr_drsuapi.h"
33 #include "librpc/gen_ndr/ndr_drsblobs.h"
34 #include "libcli/composite/composite.h"
35 #include "auth/gensec/gensec.h"
36 #include "param/param.h"
37
38 struct dreplsrv_out_drsuapi_state {
39         struct composite_context *creq;
40
41         struct dreplsrv_out_connection *conn;
42
43         struct dreplsrv_drsuapi_connection *drsuapi;
44
45         struct drsuapi_DsBindInfoCtr bind_info_ctr;
46         struct drsuapi_DsBind bind_r;
47 };
48
49 static void dreplsrv_out_drsuapi_connect_recv(struct composite_context *creq);
50
51 struct composite_context *dreplsrv_out_drsuapi_send(struct dreplsrv_out_connection *conn)
52 {
53         struct composite_context *c;
54         struct composite_context *creq;
55         struct dreplsrv_out_drsuapi_state *st;
56
57         c = composite_create(conn, conn->service->task->event_ctx);
58         if (c == NULL) return NULL;
59
60         st = talloc_zero(c, struct dreplsrv_out_drsuapi_state);
61         if (composite_nomem(st, c)) return c;
62
63         c->private_data = st;
64
65         st->creq        = c;
66         st->conn        = conn;
67         st->drsuapi     = conn->drsuapi;
68
69         if (st->drsuapi && !st->drsuapi->pipe->conn->dead) {
70                 composite_done(c);
71                 return c;
72         } else if (st->drsuapi && st->drsuapi->pipe->conn->dead) {
73                 talloc_free(st->drsuapi);
74                 conn->drsuapi = NULL;
75         }
76
77         st->drsuapi     = talloc_zero(st, struct dreplsrv_drsuapi_connection);
78         if (composite_nomem(st->drsuapi, c)) return c;
79
80         creq = dcerpc_pipe_connect_b_send(st, conn->binding, &ndr_table_drsuapi,
81                                           conn->service->system_session_info->credentials,
82                                           c->event_ctx, conn->service->task->lp_ctx);
83         composite_continue(c, creq, dreplsrv_out_drsuapi_connect_recv, st);
84
85         return c;
86 }
87
88 static void dreplsrv_out_drsuapi_bind_send(struct dreplsrv_out_drsuapi_state *st);
89
90 static void dreplsrv_out_drsuapi_connect_recv(struct composite_context *creq)
91 {
92         struct dreplsrv_out_drsuapi_state *st = talloc_get_type(creq->async.private_data,
93                                                 struct dreplsrv_out_drsuapi_state);
94         struct composite_context *c = st->creq;
95
96         c->status = dcerpc_pipe_connect_b_recv(creq, st->drsuapi, &st->drsuapi->pipe);
97         if (!composite_is_ok(c)) return;
98
99         c->status = gensec_session_key(st->drsuapi->pipe->conn->security_state.generic_state,
100                                        &st->drsuapi->gensec_skey);
101         if (!composite_is_ok(c)) return;
102
103         dreplsrv_out_drsuapi_bind_send(st);
104 }
105
106 static void dreplsrv_out_drsuapi_bind_recv(struct rpc_request *req);
107
108 static void dreplsrv_out_drsuapi_bind_send(struct dreplsrv_out_drsuapi_state *st)
109 {
110         struct composite_context *c = st->creq;
111         struct rpc_request *req;
112
113         st->bind_info_ctr.length        = 28;
114         st->bind_info_ctr.info.info28   = st->conn->service->bind_info28;
115
116         st->bind_r.in.bind_guid = &st->conn->service->ntds_guid;
117         st->bind_r.in.bind_info = &st->bind_info_ctr;
118         st->bind_r.out.bind_handle = &st->drsuapi->bind_handle;
119
120         req = dcerpc_drsuapi_DsBind_send(st->drsuapi->pipe, st, &st->bind_r);
121         composite_continue_rpc(c, req, dreplsrv_out_drsuapi_bind_recv, st);
122 }
123
124 static void dreplsrv_out_drsuapi_bind_recv(struct rpc_request *req)
125 {
126         struct dreplsrv_out_drsuapi_state *st = talloc_get_type(req->async.private_data,
127                                                 struct dreplsrv_out_drsuapi_state);
128         struct composite_context *c = st->creq;
129
130         c->status = dcerpc_ndr_request_recv(req);
131         if (!composite_is_ok(c)) return;
132
133         if (!W_ERROR_IS_OK(st->bind_r.out.result)) {
134                 composite_error(c, werror_to_ntstatus(st->bind_r.out.result));
135                 return;
136         }
137
138         ZERO_STRUCT(st->drsuapi->remote_info28);
139         if (st->bind_r.out.bind_info) {
140                 switch (st->bind_r.out.bind_info->length) {
141                 case 24: {
142                         struct drsuapi_DsBindInfo24 *info24;
143                         info24 = &st->bind_r.out.bind_info->info.info24;
144                         st->drsuapi->remote_info28.supported_extensions = info24->supported_extensions;
145                         st->drsuapi->remote_info28.site_guid            = info24->site_guid;
146                         st->drsuapi->remote_info28.pid                  = info24->pid;
147                         st->drsuapi->remote_info28.repl_epoch           = 0;
148                         break;
149                 }
150                 case 48: {
151                         struct drsuapi_DsBindInfo48 *info48;
152                         info48 = &st->bind_r.out.bind_info->info.info48;
153                         st->drsuapi->remote_info28.supported_extensions = info48->supported_extensions;
154                         st->drsuapi->remote_info28.site_guid            = info48->site_guid;
155                         st->drsuapi->remote_info28.pid                  = info48->pid;
156                         st->drsuapi->remote_info28.repl_epoch           = info48->repl_epoch;
157                         break;
158                 }
159                 case 28:
160                         st->drsuapi->remote_info28 = st->bind_r.out.bind_info->info.info28;
161                         break;
162                 }
163         }
164
165         composite_done(c);
166 }
167
168 NTSTATUS dreplsrv_out_drsuapi_recv(struct composite_context *c)
169 {
170         NTSTATUS status;
171         struct dreplsrv_out_drsuapi_state *st = talloc_get_type(c->private_data,
172                                                 struct dreplsrv_out_drsuapi_state);
173
174         status = composite_wait(c);
175
176         if (NT_STATUS_IS_OK(status)) {
177                 st->conn->drsuapi = talloc_steal(st->conn, st->drsuapi);
178         }
179
180         talloc_free(c);
181         return status;
182 }
183
184 struct dreplsrv_op_pull_source_state {
185         struct composite_context *creq;
186
187         struct dreplsrv_out_operation *op;
188
189         struct dreplsrv_drsuapi_connection *drsuapi;
190
191         bool have_all;
192
193         uint32_t ctr_level;
194         struct drsuapi_DsGetNCChangesCtr1 *ctr1;
195         struct drsuapi_DsGetNCChangesCtr6 *ctr6;
196 };
197
198 static void dreplsrv_op_pull_source_connect_recv(struct composite_context *creq);
199
200 struct composite_context *dreplsrv_op_pull_source_send(struct dreplsrv_out_operation *op)
201 {
202         struct composite_context *c;
203         struct composite_context *creq;
204         struct dreplsrv_op_pull_source_state *st;
205
206         c = composite_create(op, op->service->task->event_ctx);
207         if (c == NULL) return NULL;
208
209         st = talloc_zero(c, struct dreplsrv_op_pull_source_state);
210         if (composite_nomem(st, c)) return c;
211
212         st->creq        = c;
213         st->op          = op;
214
215         creq = dreplsrv_out_drsuapi_send(op->source_dsa->conn);
216         composite_continue(c, creq, dreplsrv_op_pull_source_connect_recv, st);
217
218         return c;
219 }
220
221 static void dreplsrv_op_pull_source_get_changes_send(struct dreplsrv_op_pull_source_state *st);
222
223 static void dreplsrv_op_pull_source_connect_recv(struct composite_context *creq)
224 {
225         struct dreplsrv_op_pull_source_state *st = talloc_get_type(creq->async.private_data,
226                                                    struct dreplsrv_op_pull_source_state);
227         struct composite_context *c = st->creq;
228
229         c->status = dreplsrv_out_drsuapi_recv(creq);
230         if (!composite_is_ok(c)) return;
231
232         dreplsrv_op_pull_source_get_changes_send(st);
233 }
234
235 static void dreplsrv_op_pull_source_get_changes_recv(struct rpc_request *req);
236
237 static void dreplsrv_op_pull_source_get_changes_send(struct dreplsrv_op_pull_source_state *st)
238 {
239         struct composite_context *c = st->creq;
240         struct repsFromTo1 *rf1 = st->op->source_dsa->repsFrom1;
241         struct dreplsrv_service *service = st->op->service;
242         struct dreplsrv_partition *partition = st->op->source_dsa->partition;
243         struct dreplsrv_drsuapi_connection *drsuapi = st->op->source_dsa->conn->drsuapi;
244         struct rpc_request *req;
245         struct drsuapi_DsGetNCChanges *r;
246
247         r = talloc(st, struct drsuapi_DsGetNCChanges);
248         if (composite_nomem(r, c)) return;
249
250         r->out.level_out = talloc(r, int32_t);
251         if (composite_nomem(r->out.level_out, c)) return;
252         r->in.req = talloc(r, union drsuapi_DsGetNCChangesRequest);
253         if (composite_nomem(r->in.req, c)) return;
254         r->out.ctr = talloc(r, union drsuapi_DsGetNCChangesCtr);
255         if (composite_nomem(r->out.ctr, c)) return;
256
257         r->in.bind_handle       = &drsuapi->bind_handle;
258         if (drsuapi->remote_info28.supported_extensions & DRSUAPI_SUPPORTED_EXTENSION_GETCHGREQ_V8) {
259                 r->in.level                             = 8;
260                 r->in.req->req8.destination_dsa_guid    = service->ntds_guid;
261                 r->in.req->req8.source_dsa_invocation_id= rf1->source_dsa_invocation_id;
262                 r->in.req->req8.naming_context          = &partition->nc;
263                 r->in.req->req8.highwatermark           = rf1->highwatermark;
264                 r->in.req->req8.uptodateness_vector     = NULL;/*&partition->uptodatevector_ex;*/
265                 r->in.req->req8.replica_flags           = rf1->replica_flags;
266                 r->in.req->req8.max_object_count        = 133;
267                 r->in.req->req8.max_ndr_size            = 1336811;
268                 r->in.req->req8.extended_op             = DRSUAPI_EXOP_NONE;
269                 r->in.req->req8.fsmo_info               = 0;
270                 r->in.req->req8.partial_attribute_set   = NULL;
271                 r->in.req->req8.partial_attribute_set_ex= NULL;
272                 r->in.req->req8.mapping_ctr.num_mappings= 0;
273                 r->in.req->req8.mapping_ctr.mappings    = NULL;
274         } else {
275                 r->in.level                             = 5;
276                 r->in.req->req5.destination_dsa_guid    = service->ntds_guid;
277                 r->in.req->req5.source_dsa_invocation_id= rf1->source_dsa_invocation_id;
278                 r->in.req->req5.naming_context          = &partition->nc;
279                 r->in.req->req5.highwatermark           = rf1->highwatermark;
280                 r->in.req->req5.uptodateness_vector     = NULL;/*&partition->uptodatevector_ex;*/
281                 r->in.req->req5.replica_flags           = rf1->replica_flags;
282                 r->in.req->req5.max_object_count        = 133;
283                 r->in.req->req5.max_ndr_size            = 1336770;
284                 r->in.req->req5.extended_op             = DRSUAPI_EXOP_NONE;
285                 r->in.req->req5.fsmo_info               = 0;
286         }
287
288         req = dcerpc_drsuapi_DsGetNCChanges_send(drsuapi->pipe, r, r);
289         composite_continue_rpc(c, req, dreplsrv_op_pull_source_get_changes_recv, st);
290 }
291
292 static void dreplsrv_op_pull_source_apply_changes_send(struct dreplsrv_op_pull_source_state *st,
293                                                        struct drsuapi_DsGetNCChanges *r,
294                                                        uint32_t ctr_level,
295                                                        struct drsuapi_DsGetNCChangesCtr1 *ctr1,
296                                                        struct drsuapi_DsGetNCChangesCtr6 *ctr6);
297
298 static void dreplsrv_op_pull_source_get_changes_recv(struct rpc_request *req)
299 {
300         struct dreplsrv_op_pull_source_state *st = talloc_get_type(req->async.private_data,
301                                                    struct dreplsrv_op_pull_source_state);
302         struct composite_context *c = st->creq;
303         struct drsuapi_DsGetNCChanges *r = talloc_get_type(req->ndr.struct_ptr,
304                                            struct drsuapi_DsGetNCChanges);
305         uint32_t ctr_level = 0;
306         struct drsuapi_DsGetNCChangesCtr1 *ctr1 = NULL;
307         struct drsuapi_DsGetNCChangesCtr6 *ctr6 = NULL;
308
309         c->status = dcerpc_ndr_request_recv(req);
310         if (!composite_is_ok(c)) return;
311
312         if (!W_ERROR_IS_OK(r->out.result)) {
313                 composite_error(c, werror_to_ntstatus(r->out.result));
314                 return;
315         }
316
317         if (*r->out.level_out == 1) {
318                 ctr_level = 1;
319                 ctr1 = &r->out.ctr->ctr1;
320         } else if (*r->out.level_out == 2 &&
321                    r->out.ctr->ctr2.mszip1.ts) {
322                 ctr_level = 1;
323                 ctr1 = &r->out.ctr->ctr2.mszip1.ts->ctr1;
324         } else if (*r->out.level_out == 6) {
325                 ctr_level = 6;
326                 ctr6 = &r->out.ctr->ctr6;
327         } else if (*r->out.level_out == 7 &&
328                    r->out.ctr->ctr7.level == 6 &&
329                    r->out.ctr->ctr7.type == DRSUAPI_COMPRESSION_TYPE_MSZIP &&
330                    r->out.ctr->ctr7.ctr.mszip6.ts) {
331                 ctr_level = 6;
332                 ctr6 = &r->out.ctr->ctr7.ctr.mszip6.ts->ctr6;
333         } else if (*r->out.level_out == 7 &&
334                    r->out.ctr->ctr7.level == 6 &&
335                    r->out.ctr->ctr7.type == DRSUAPI_COMPRESSION_TYPE_XPRESS &&
336                    r->out.ctr->ctr7.ctr.xpress6.ts) {
337                 ctr_level = 6;
338                 ctr6 = &r->out.ctr->ctr7.ctr.xpress6.ts->ctr6;
339         } else {
340                 composite_error(c, werror_to_ntstatus(WERR_BAD_NET_RESP));
341                 return;
342         }
343
344         if (!ctr1 && !ctr6) {
345                 composite_error(c, werror_to_ntstatus(WERR_BAD_NET_RESP));
346                 return;
347         }
348
349         if (ctr_level == 6) {
350                 if (!W_ERROR_IS_OK(ctr6->drs_error)) {
351                         composite_error(c, werror_to_ntstatus(ctr6->drs_error));
352                         return;
353                 }
354         }
355
356         dreplsrv_op_pull_source_apply_changes_send(st, r, ctr_level, ctr1, ctr6);
357 }
358
359 static void dreplsrv_update_refs_send(struct dreplsrv_op_pull_source_state *st);
360
361 static void dreplsrv_op_pull_source_apply_changes_send(struct dreplsrv_op_pull_source_state *st,
362                                                        struct drsuapi_DsGetNCChanges *r,
363                                                        uint32_t ctr_level,
364                                                        struct drsuapi_DsGetNCChangesCtr1 *ctr1,
365                                                        struct drsuapi_DsGetNCChangesCtr6 *ctr6)
366 {
367         struct composite_context *c = st->creq;
368         struct repsFromTo1 rf1 = *st->op->source_dsa->repsFrom1;
369         struct dreplsrv_service *service = st->op->service;
370         struct dreplsrv_partition *partition = st->op->source_dsa->partition;
371         struct dreplsrv_drsuapi_connection *drsuapi = st->op->source_dsa->conn->drsuapi;
372         const struct drsuapi_DsReplicaOIDMapping_Ctr *mapping_ctr;
373         uint32_t object_count;
374         struct drsuapi_DsReplicaObjectListItemEx *first_object;
375         uint32_t linked_attributes_count;
376         struct drsuapi_DsReplicaLinkedAttribute *linked_attributes;
377         const struct drsuapi_DsReplicaCursor2CtrEx *uptodateness_vector;
378         struct dsdb_extended_replicated_objects *objects;
379         bool more_data = false;
380         WERROR status;
381
382         switch (ctr_level) {
383         case 1:
384                 mapping_ctr                     = &ctr1->mapping_ctr;
385                 object_count                    = ctr1->object_count;
386                 first_object                    = ctr1->first_object;
387                 linked_attributes_count         = 0;
388                 linked_attributes               = NULL;
389                 rf1.highwatermark               = ctr1->new_highwatermark;
390                 uptodateness_vector             = NULL; /* TODO: map it */
391                 more_data                       = ctr1->more_data;
392                 break;
393         case 6:
394                 mapping_ctr                     = &ctr6->mapping_ctr;
395                 object_count                    = ctr6->object_count;
396                 first_object                    = ctr6->first_object;
397                 linked_attributes_count         = ctr6->linked_attributes_count;
398                 linked_attributes               = ctr6->linked_attributes;
399                 rf1.highwatermark               = ctr6->new_highwatermark;
400                 uptodateness_vector             = ctr6->uptodateness_vector;
401                 more_data                       = ctr6->more_data;
402                 break;
403         default:
404                 composite_error(c, werror_to_ntstatus(WERR_BAD_NET_RESP));
405                 return;
406         }
407
408         status = dsdb_extended_replicated_objects_convert(service->samdb,
409                                                           partition->nc.dn,
410                                                           mapping_ctr,
411                                                           object_count,
412                                                           first_object,
413                                                           linked_attributes_count,
414                                                           linked_attributes,
415                                                           &rf1,
416                                                           uptodateness_vector,
417                                                           &drsuapi->gensec_skey,
418                                                           st, &objects);
419         if (!W_ERROR_IS_OK(status)) {
420                 DEBUG(0,("Failed to convert objects: %s\n", win_errstr(status)));
421                 composite_error(c, werror_to_ntstatus(status));
422                 return;
423         }
424
425         status = dsdb_extended_replicated_objects_commit(service->samdb,
426                                                          objects, 
427                                                          &st->op->source_dsa->notify_uSN);
428         talloc_free(objects);
429         if (!W_ERROR_IS_OK(status)) {
430                 DEBUG(0,("Failed to commit objects: %s\n", win_errstr(status)));
431                 composite_error(c, werror_to_ntstatus(status));
432                 return;
433         }
434
435         /* if it applied fine, we need to update the highwatermark */
436         *st->op->source_dsa->repsFrom1 = rf1;
437
438         /*
439          * TODO: update our uptodatevector!
440          */
441
442         if (more_data) {
443                 dreplsrv_op_pull_source_get_changes_send(st);
444                 return;
445         }
446
447         /* now we need to update the repsTo record for this partition
448            on the server. These records are initially established when
449            we join the domain, but they quickly expire.  We do it here
450            so we can use the already established DRSUAPI pipe
451         */
452         dreplsrv_update_refs_send(st);
453 }
454
455 WERROR dreplsrv_op_pull_source_recv(struct composite_context *c)
456 {
457         NTSTATUS status;
458
459         status = composite_wait(c);
460
461         talloc_free(c);
462         return ntstatus_to_werror(status);
463 }
464
465 /*
466   receive a UpdateRefs reply
467  */
468 static void dreplsrv_update_refs_recv(struct rpc_request *req)
469 {
470         struct dreplsrv_op_pull_source_state *st = talloc_get_type(req->async.private_data,
471                                                    struct dreplsrv_op_pull_source_state);
472         struct composite_context *c = st->creq;
473         struct drsuapi_DsReplicaUpdateRefs *r = talloc_get_type(req->ndr.struct_ptr,
474                                                                 struct drsuapi_DsReplicaUpdateRefs);
475
476         c->status = dcerpc_ndr_request_recv(req);
477         if (!composite_is_ok(c)) {
478                 DEBUG(0,("UpdateRefs failed with %s\n", 
479                          nt_errstr(c->status)));
480                 return;
481         }
482
483         if (!W_ERROR_IS_OK(r->out.result)) {
484                 DEBUG(0,("UpdateRefs failed with %s for %s %s\n", 
485                          win_errstr(r->out.result),
486                          r->in.req.req1.dest_dsa_dns_name,
487                          r->in.req.req1.naming_context->dn));
488                 composite_error(c, werror_to_ntstatus(r->out.result));
489                 return;
490         }
491
492         DEBUG(4,("UpdateRefs OK for %s %s\n", 
493                  r->in.req.req1.dest_dsa_dns_name,
494                  r->in.req.req1.naming_context->dn));
495
496         composite_done(c);
497 }
498
499 /*
500   send a UpdateRefs request to refresh our repsTo record on the server
501  */
502 static void dreplsrv_update_refs_send(struct dreplsrv_op_pull_source_state *st)
503 {
504         struct composite_context *c = st->creq;
505         struct dreplsrv_service *service = st->op->service;
506         struct dreplsrv_partition *partition = st->op->source_dsa->partition;
507         struct dreplsrv_drsuapi_connection *drsuapi = st->op->source_dsa->conn->drsuapi;
508         struct rpc_request *req;
509         struct drsuapi_DsReplicaUpdateRefs *r;
510         char *ntds_guid_str;
511         char *ntds_dns_name;
512
513         r = talloc(st, struct drsuapi_DsReplicaUpdateRefs);
514         if (composite_nomem(r, c)) return;
515
516         ntds_guid_str = GUID_string(r, &service->ntds_guid);
517         if (composite_nomem(ntds_guid_str, c)) return;
518
519         ntds_dns_name = talloc_asprintf(r, "%s._msdcs.%s",
520                                         ntds_guid_str,
521                                         lp_dnsdomain(service->task->lp_ctx));
522         if (composite_nomem(ntds_dns_name, c)) return;
523
524         r->in.bind_handle       = &drsuapi->bind_handle;
525         r->in.level             = 1;
526         r->in.req.req1.naming_context     = &partition->nc;
527         r->in.req.req1.dest_dsa_dns_name  = ntds_dns_name;
528         r->in.req.req1.dest_dsa_guid      = service->ntds_guid;
529         r->in.req.req1.options            = 
530                 DRSUAPI_DS_REPLICA_UPDATE_ADD_REFERENCE |
531                 DRSUAPI_DS_REPLICA_UPDATE_DELETE_REFERENCE;
532         if (!samdb_rodc(service->task->lp_ctx)) {
533                 r->in.req.req1.options |= DRSUAPI_DS_REPLICA_UPDATE_WRITEABLE;
534         }
535
536         req = dcerpc_drsuapi_DsReplicaUpdateRefs_send(drsuapi->pipe, r, r);
537         composite_continue_rpc(c, req, dreplsrv_update_refs_recv, st);
538 }