s4-repl: added a preiodic notification check to the repl task
[ira/wip.git] / source4 / dsdb / repl / drepl_out_helpers.c
1 /* 
2    Unix SMB/CIFS mplementation.
3    DSDB replication service helper function for outgoing traffic
4    
5    Copyright (C) Stefan Metzmacher 2007
6     
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19    
20 */
21
22 #include "includes.h"
23 #include "dsdb/samdb/samdb.h"
24 #include "auth/auth.h"
25 #include "smbd/service.h"
26 #include "lib/events/events.h"
27 #include "lib/messaging/irpc.h"
28 #include "dsdb/repl/drepl_service.h"
29 #include "lib/ldb/include/ldb_errors.h"
30 #include "../lib/util/dlinklist.h"
31 #include "librpc/gen_ndr/ndr_misc.h"
32 #include "librpc/gen_ndr/ndr_drsuapi.h"
33 #include "librpc/gen_ndr/ndr_drsblobs.h"
34 #include "libcli/composite/composite.h"
35 #include "auth/gensec/gensec.h"
36 #include "param/param.h"
37
38 struct dreplsrv_out_drsuapi_state {
39         struct composite_context *creq;
40
41         struct dreplsrv_out_connection *conn;
42
43         struct dreplsrv_drsuapi_connection *drsuapi;
44
45         struct drsuapi_DsBindInfoCtr bind_info_ctr;
46         struct drsuapi_DsBind bind_r;
47 };
48
49 static void dreplsrv_out_drsuapi_connect_recv(struct composite_context *creq);
50
51 struct composite_context *dreplsrv_out_drsuapi_send(struct dreplsrv_out_connection *conn)
52 {
53         struct composite_context *c;
54         struct composite_context *creq;
55         struct dreplsrv_out_drsuapi_state *st;
56
57         c = composite_create(conn, conn->service->task->event_ctx);
58         if (c == NULL) return NULL;
59
60         st = talloc_zero(c, struct dreplsrv_out_drsuapi_state);
61         if (composite_nomem(st, c)) return c;
62
63         c->private_data = st;
64
65         st->creq        = c;
66         st->conn        = conn;
67         st->drsuapi     = conn->drsuapi;
68
69         if (st->drsuapi && !st->drsuapi->pipe->conn->dead) {
70                 composite_done(c);
71                 return c;
72         } else if (st->drsuapi && st->drsuapi->pipe->conn->dead) {
73                 talloc_free(st->drsuapi);
74                 conn->drsuapi = NULL;
75         }
76
77         st->drsuapi     = talloc_zero(st, struct dreplsrv_drsuapi_connection);
78         if (composite_nomem(st->drsuapi, c)) return c;
79
80         creq = dcerpc_pipe_connect_b_send(st, conn->binding, &ndr_table_drsuapi,
81                                           conn->service->system_session_info->credentials,
82                                           c->event_ctx, conn->service->task->lp_ctx);
83         composite_continue(c, creq, dreplsrv_out_drsuapi_connect_recv, st);
84
85         return c;
86 }
87
88 static void dreplsrv_out_drsuapi_bind_send(struct dreplsrv_out_drsuapi_state *st);
89
90 static void dreplsrv_out_drsuapi_connect_recv(struct composite_context *creq)
91 {
92         struct dreplsrv_out_drsuapi_state *st = talloc_get_type(creq->async.private_data,
93                                                 struct dreplsrv_out_drsuapi_state);
94         struct composite_context *c = st->creq;
95
96         c->status = dcerpc_pipe_connect_b_recv(creq, st->drsuapi, &st->drsuapi->pipe);
97         if (!composite_is_ok(c)) return;
98
99         c->status = gensec_session_key(st->drsuapi->pipe->conn->security_state.generic_state,
100                                        &st->drsuapi->gensec_skey);
101         if (!composite_is_ok(c)) return;
102
103         dreplsrv_out_drsuapi_bind_send(st);
104 }
105
106 static void dreplsrv_out_drsuapi_bind_recv(struct rpc_request *req);
107
108 static void dreplsrv_out_drsuapi_bind_send(struct dreplsrv_out_drsuapi_state *st)
109 {
110         struct composite_context *c = st->creq;
111         struct rpc_request *req;
112
113         st->bind_info_ctr.length        = 28;
114         st->bind_info_ctr.info.info28   = st->conn->service->bind_info28;
115
116         st->bind_r.in.bind_guid = &st->conn->service->ntds_guid;
117         st->bind_r.in.bind_info = &st->bind_info_ctr;
118         st->bind_r.out.bind_handle = &st->drsuapi->bind_handle;
119
120         req = dcerpc_drsuapi_DsBind_send(st->drsuapi->pipe, st, &st->bind_r);
121         composite_continue_rpc(c, req, dreplsrv_out_drsuapi_bind_recv, st);
122 }
123
124 static void dreplsrv_out_drsuapi_bind_recv(struct rpc_request *req)
125 {
126         struct dreplsrv_out_drsuapi_state *st = talloc_get_type(req->async.private_data,
127                                                 struct dreplsrv_out_drsuapi_state);
128         struct composite_context *c = st->creq;
129
130         c->status = dcerpc_ndr_request_recv(req);
131         if (!composite_is_ok(c)) return;
132
133         if (!W_ERROR_IS_OK(st->bind_r.out.result)) {
134                 composite_error(c, werror_to_ntstatus(st->bind_r.out.result));
135                 return;
136         }
137
138         ZERO_STRUCT(st->drsuapi->remote_info28);
139         if (st->bind_r.out.bind_info) {
140                 switch (st->bind_r.out.bind_info->length) {
141                 case 24: {
142                         struct drsuapi_DsBindInfo24 *info24;
143                         info24 = &st->bind_r.out.bind_info->info.info24;
144                         st->drsuapi->remote_info28.supported_extensions = info24->supported_extensions;
145                         st->drsuapi->remote_info28.site_guid            = info24->site_guid;
146                         st->drsuapi->remote_info28.pid                  = info24->pid;
147                         st->drsuapi->remote_info28.repl_epoch           = 0;
148                         break;
149                 }
150                 case 48: {
151                         struct drsuapi_DsBindInfo48 *info48;
152                         info48 = &st->bind_r.out.bind_info->info.info48;
153                         st->drsuapi->remote_info28.supported_extensions = info48->supported_extensions;
154                         st->drsuapi->remote_info28.site_guid            = info48->site_guid;
155                         st->drsuapi->remote_info28.pid                  = info48->pid;
156                         st->drsuapi->remote_info28.repl_epoch           = info48->repl_epoch;
157                         break;
158                 }
159                 case 28:
160                         st->drsuapi->remote_info28 = st->bind_r.out.bind_info->info.info28;
161                         break;
162                 }
163         }
164
165         composite_done(c);
166 }
167
168 NTSTATUS dreplsrv_out_drsuapi_recv(struct composite_context *c)
169 {
170         NTSTATUS status;
171         struct dreplsrv_out_drsuapi_state *st = talloc_get_type(c->private_data,
172                                                 struct dreplsrv_out_drsuapi_state);
173
174         status = composite_wait(c);
175
176         if (NT_STATUS_IS_OK(status)) {
177                 st->conn->drsuapi = talloc_steal(st->conn, st->drsuapi);
178         }
179
180         talloc_free(c);
181         return status;
182 }
183
184 struct dreplsrv_op_pull_source_state {
185         struct composite_context *creq;
186
187         struct dreplsrv_out_operation *op;
188
189         struct dreplsrv_drsuapi_connection *drsuapi;
190
191         bool have_all;
192
193         uint32_t ctr_level;
194         struct drsuapi_DsGetNCChangesCtr1 *ctr1;
195         struct drsuapi_DsGetNCChangesCtr6 *ctr6;
196 };
197
198 static void dreplsrv_op_pull_source_connect_recv(struct composite_context *creq);
199
200 struct composite_context *dreplsrv_op_pull_source_send(struct dreplsrv_out_operation *op)
201 {
202         struct composite_context *c;
203         struct composite_context *creq;
204         struct dreplsrv_op_pull_source_state *st;
205
206         c = composite_create(op, op->service->task->event_ctx);
207         if (c == NULL) return NULL;
208
209         st = talloc_zero(c, struct dreplsrv_op_pull_source_state);
210         if (composite_nomem(st, c)) return c;
211
212         st->creq        = c;
213         st->op          = op;
214
215         creq = dreplsrv_out_drsuapi_send(op->source_dsa->conn);
216         composite_continue(c, creq, dreplsrv_op_pull_source_connect_recv, st);
217
218         return c;
219 }
220
221 static void dreplsrv_op_pull_source_get_changes_send(struct dreplsrv_op_pull_source_state *st);
222
223 static void dreplsrv_op_pull_source_connect_recv(struct composite_context *creq)
224 {
225         struct dreplsrv_op_pull_source_state *st = talloc_get_type(creq->async.private_data,
226                                                    struct dreplsrv_op_pull_source_state);
227         struct composite_context *c = st->creq;
228
229         c->status = dreplsrv_out_drsuapi_recv(creq);
230         if (!composite_is_ok(c)) return;
231
232         dreplsrv_op_pull_source_get_changes_send(st);
233 }
234
235 static void dreplsrv_op_pull_source_get_changes_recv(struct rpc_request *req);
236
237 static void dreplsrv_op_pull_source_get_changes_send(struct dreplsrv_op_pull_source_state *st)
238 {
239         struct composite_context *c = st->creq;
240         struct repsFromTo1 *rf1 = st->op->source_dsa->repsFrom1;
241         struct dreplsrv_service *service = st->op->service;
242         struct dreplsrv_partition *partition = st->op->source_dsa->partition;
243         struct dreplsrv_drsuapi_connection *drsuapi = st->op->source_dsa->conn->drsuapi;
244         struct rpc_request *req;
245         struct drsuapi_DsGetNCChanges *r;
246
247         r = talloc(st, struct drsuapi_DsGetNCChanges);
248         if (composite_nomem(r, c)) return;
249
250         r->out.level_out = talloc(r, int32_t);
251         if (composite_nomem(r->out.level_out, c)) return;
252         r->in.req = talloc(r, union drsuapi_DsGetNCChangesRequest);
253         if (composite_nomem(r->in.req, c)) return;
254         r->out.ctr = talloc(r, union drsuapi_DsGetNCChangesCtr);
255         if (composite_nomem(r->out.ctr, c)) return;
256
257         r->in.bind_handle       = &drsuapi->bind_handle;
258         if (drsuapi->remote_info28.supported_extensions & DRSUAPI_SUPPORTED_EXTENSION_GETCHGREQ_V8) {
259                 r->in.level                             = 8;
260                 r->in.req->req8.destination_dsa_guid    = service->ntds_guid;
261                 r->in.req->req8.source_dsa_invocation_id= rf1->source_dsa_invocation_id;
262                 r->in.req->req8.naming_context          = &partition->nc;
263                 r->in.req->req8.highwatermark           = rf1->highwatermark;
264                 r->in.req->req8.uptodateness_vector     = NULL;/*&partition->uptodatevector_ex;*/
265                 r->in.req->req8.replica_flags           = rf1->replica_flags;
266                 r->in.req->req8.max_object_count        = 133;
267                 r->in.req->req8.max_ndr_size            = 1336811;
268                 r->in.req->req8.extended_op             = DRSUAPI_EXOP_NONE;
269                 r->in.req->req8.fsmo_info               = 0;
270                 r->in.req->req8.partial_attribute_set   = NULL;
271                 r->in.req->req8.partial_attribute_set_ex= NULL;
272                 r->in.req->req8.mapping_ctr.num_mappings= 0;
273                 r->in.req->req8.mapping_ctr.mappings    = NULL;
274         } else {
275                 r->in.level                             = 5;
276                 r->in.req->req5.destination_dsa_guid    = service->ntds_guid;
277                 r->in.req->req5.source_dsa_invocation_id= rf1->source_dsa_invocation_id;
278                 r->in.req->req5.naming_context          = &partition->nc;
279                 r->in.req->req5.highwatermark           = rf1->highwatermark;
280                 r->in.req->req5.uptodateness_vector     = NULL;/*&partition->uptodatevector_ex;*/
281                 r->in.req->req5.replica_flags           = rf1->replica_flags;
282                 r->in.req->req5.max_object_count        = 133;
283                 r->in.req->req5.max_ndr_size            = 1336770;
284                 r->in.req->req5.extended_op             = DRSUAPI_EXOP_NONE;
285                 r->in.req->req5.fsmo_info               = 0;
286         }
287
288         req = dcerpc_drsuapi_DsGetNCChanges_send(drsuapi->pipe, r, r);
289         composite_continue_rpc(c, req, dreplsrv_op_pull_source_get_changes_recv, st);
290 }
291
292 static void dreplsrv_op_pull_source_apply_changes_send(struct dreplsrv_op_pull_source_state *st,
293                                                        struct drsuapi_DsGetNCChanges *r,
294                                                        uint32_t ctr_level,
295                                                        struct drsuapi_DsGetNCChangesCtr1 *ctr1,
296                                                        struct drsuapi_DsGetNCChangesCtr6 *ctr6);
297
298 static void dreplsrv_op_pull_source_get_changes_recv(struct rpc_request *req)
299 {
300         struct dreplsrv_op_pull_source_state *st = talloc_get_type(req->async.private_data,
301                                                    struct dreplsrv_op_pull_source_state);
302         struct composite_context *c = st->creq;
303         struct drsuapi_DsGetNCChanges *r = talloc_get_type(req->ndr.struct_ptr,
304                                            struct drsuapi_DsGetNCChanges);
305         uint32_t ctr_level = 0;
306         struct drsuapi_DsGetNCChangesCtr1 *ctr1 = NULL;
307         struct drsuapi_DsGetNCChangesCtr6 *ctr6 = NULL;
308
309         c->status = dcerpc_ndr_request_recv(req);
310         if (!composite_is_ok(c)) return;
311
312         if (!W_ERROR_IS_OK(r->out.result)) {
313                 composite_error(c, werror_to_ntstatus(r->out.result));
314                 return;
315         }
316
317         if (*r->out.level_out == 1) {
318                 ctr_level = 1;
319                 ctr1 = &r->out.ctr->ctr1;
320         } else if (*r->out.level_out == 2 &&
321                    r->out.ctr->ctr2.mszip1.ts) {
322                 ctr_level = 1;
323                 ctr1 = &r->out.ctr->ctr2.mszip1.ts->ctr1;
324         } else if (*r->out.level_out == 6) {
325                 ctr_level = 6;
326                 ctr6 = &r->out.ctr->ctr6;
327         } else if (*r->out.level_out == 7 &&
328                    r->out.ctr->ctr7.level == 6 &&
329                    r->out.ctr->ctr7.type == DRSUAPI_COMPRESSION_TYPE_MSZIP &&
330                    r->out.ctr->ctr7.ctr.mszip6.ts) {
331                 ctr_level = 6;
332                 ctr6 = &r->out.ctr->ctr7.ctr.mszip6.ts->ctr6;
333         } else if (*r->out.level_out == 7 &&
334                    r->out.ctr->ctr7.level == 6 &&
335                    r->out.ctr->ctr7.type == DRSUAPI_COMPRESSION_TYPE_XPRESS &&
336                    r->out.ctr->ctr7.ctr.xpress6.ts) {
337                 ctr_level = 6;
338                 ctr6 = &r->out.ctr->ctr7.ctr.xpress6.ts->ctr6;
339         } else {
340                 composite_error(c, werror_to_ntstatus(WERR_BAD_NET_RESP));
341                 return;
342         }
343
344         if (!ctr1 && !ctr6) {
345                 composite_error(c, werror_to_ntstatus(WERR_BAD_NET_RESP));
346                 return;
347         }
348
349         if (ctr_level == 6) {
350                 if (!W_ERROR_IS_OK(ctr6->drs_error)) {
351                         composite_error(c, werror_to_ntstatus(ctr6->drs_error));
352                         return;
353                 }
354         }
355
356         dreplsrv_op_pull_source_apply_changes_send(st, r, ctr_level, ctr1, ctr6);
357 }
358
359 static void dreplsrv_update_refs_send(struct dreplsrv_op_pull_source_state *st);
360
361 static void dreplsrv_op_pull_source_apply_changes_send(struct dreplsrv_op_pull_source_state *st,
362                                                        struct drsuapi_DsGetNCChanges *r,
363                                                        uint32_t ctr_level,
364                                                        struct drsuapi_DsGetNCChangesCtr1 *ctr1,
365                                                        struct drsuapi_DsGetNCChangesCtr6 *ctr6)
366 {
367         struct composite_context *c = st->creq;
368         struct repsFromTo1 rf1 = *st->op->source_dsa->repsFrom1;
369         struct dreplsrv_service *service = st->op->service;
370         struct dreplsrv_partition *partition = st->op->source_dsa->partition;
371         struct dreplsrv_drsuapi_connection *drsuapi = st->op->source_dsa->conn->drsuapi;
372         const struct drsuapi_DsReplicaOIDMapping_Ctr *mapping_ctr;
373         uint32_t object_count;
374         struct drsuapi_DsReplicaObjectListItemEx *first_object;
375         uint32_t linked_attributes_count;
376         struct drsuapi_DsReplicaLinkedAttribute *linked_attributes;
377         const struct drsuapi_DsReplicaCursor2CtrEx *uptodateness_vector;
378         bool more_data = false;
379         WERROR status;
380
381         switch (ctr_level) {
382         case 1:
383                 mapping_ctr                     = &ctr1->mapping_ctr;
384                 object_count                    = ctr1->object_count;
385                 first_object                    = ctr1->first_object;
386                 linked_attributes_count         = 0;
387                 linked_attributes               = NULL;
388                 rf1.highwatermark               = ctr1->new_highwatermark;
389                 uptodateness_vector             = NULL; /* TODO: map it */
390                 more_data                       = ctr1->more_data;
391                 break;
392         case 6:
393                 mapping_ctr                     = &ctr6->mapping_ctr;
394                 object_count                    = ctr6->object_count;
395                 first_object                    = ctr6->first_object;
396                 linked_attributes_count         = ctr6->linked_attributes_count;
397                 linked_attributes               = ctr6->linked_attributes;
398                 rf1.highwatermark               = ctr6->new_highwatermark;
399                 uptodateness_vector             = ctr6->uptodateness_vector;
400                 more_data                       = ctr6->more_data;
401                 break;
402         default:
403                 composite_error(c, werror_to_ntstatus(WERR_BAD_NET_RESP));
404                 return;
405         }
406
407         status = dsdb_extended_replicated_objects_commit(service->samdb,
408                                                          partition->nc.dn,
409                                                          mapping_ctr,
410                                                          object_count,
411                                                          first_object,
412                                                          linked_attributes_count,
413                                                          linked_attributes,
414                                                          &rf1,
415                                                          uptodateness_vector,
416                                                          &drsuapi->gensec_skey,
417                                                          st, NULL);
418         if (!W_ERROR_IS_OK(status)) {
419                 DEBUG(0,("Failed to commit objects: %s\n", win_errstr(status)));
420                 composite_error(c, werror_to_ntstatus(status));
421                 return;
422         }
423
424         /* if it applied fine, we need to update the highwatermark */
425         *st->op->source_dsa->repsFrom1 = rf1;
426
427         /*
428          * TODO: update our uptodatevector!
429          */
430
431         if (more_data) {
432                 dreplsrv_op_pull_source_get_changes_send(st);
433                 return;
434         }
435
436         /* now we need to update the repsTo record for this partition
437            on the server. These records are initially established when
438            we join the domain, but they quickly expire.  We do it here
439            so we can use the already established DRSUAPI pipe
440         */
441         dreplsrv_update_refs_send(st);
442 }
443
444 WERROR dreplsrv_op_pull_source_recv(struct composite_context *c)
445 {
446         NTSTATUS status;
447
448         status = composite_wait(c);
449
450         talloc_free(c);
451         return ntstatus_to_werror(status);
452 }
453
454 /*
455   receive a UpdateRefs reply
456  */
457 static void dreplsrv_update_refs_recv(struct rpc_request *req)
458 {
459         struct dreplsrv_op_pull_source_state *st = talloc_get_type(req->async.private_data,
460                                                    struct dreplsrv_op_pull_source_state);
461         struct composite_context *c = st->creq;
462         struct drsuapi_DsReplicaUpdateRefs *r = talloc_get_type(req->ndr.struct_ptr,
463                                                                 struct drsuapi_DsReplicaUpdateRefs);
464
465         c->status = dcerpc_ndr_request_recv(req);
466         if (!composite_is_ok(c)) {
467                 DEBUG(0,("UpdateRefs failed with %s\n", 
468                          nt_errstr(c->status)));
469                 return;
470         }
471
472         if (!W_ERROR_IS_OK(r->out.result)) {
473                 DEBUG(0,("UpdateRefs failed with %s for %s %s\n", 
474                          win_errstr(r->out.result),
475                          r->in.req.req1.dest_dsa_dns_name,
476                          r->in.req.req1.naming_context->dn));
477                 composite_error(c, werror_to_ntstatus(r->out.result));
478                 return;
479         }
480
481         DEBUG(4,("UpdateRefs OK for %s %s\n", 
482                  r->in.req.req1.dest_dsa_dns_name,
483                  r->in.req.req1.naming_context->dn));
484
485         composite_done(c);
486 }
487
488 /*
489   send a UpdateRefs request to refresh our repsTo record on the server
490  */
491 static void dreplsrv_update_refs_send(struct dreplsrv_op_pull_source_state *st)
492 {
493         struct composite_context *c = st->creq;
494         struct dreplsrv_service *service = st->op->service;
495         struct dreplsrv_partition *partition = st->op->source_dsa->partition;
496         struct dreplsrv_drsuapi_connection *drsuapi = st->op->source_dsa->conn->drsuapi;
497         struct rpc_request *req;
498         struct drsuapi_DsReplicaUpdateRefs *r;
499         char *ntds_guid_str;
500         char *ntds_dns_name;
501
502         r = talloc(st, struct drsuapi_DsReplicaUpdateRefs);
503         if (composite_nomem(r, c)) return;
504
505         ntds_guid_str = GUID_string(r, &service->ntds_guid);
506         if (composite_nomem(ntds_guid_str, c)) return;
507
508         /* lp_realm() is not really right here */
509         ntds_dns_name = talloc_asprintf(r, "%s._msdcs.%s",
510                                         ntds_guid_str,
511                                         lp_realm(service->task->lp_ctx));
512         if (composite_nomem(ntds_dns_name, c)) return;
513
514         r->in.bind_handle       = &drsuapi->bind_handle;
515         r->in.level             = 1;
516         r->in.req.req1.naming_context     = &partition->nc;
517         r->in.req.req1.dest_dsa_dns_name  = ntds_dns_name;
518         r->in.req.req1.dest_dsa_guid      = service->ntds_guid;
519         r->in.req.req1.options            = 
520                 DRSUAPI_DS_REPLICA_UPDATE_ADD_REFERENCE |
521                 DRSUAPI_DS_REPLICA_UPDATE_DELETE_REFERENCE;
522         if (!lp_parm_bool(service->task->lp_ctx, NULL, "repl", "RODC", false)) {
523                 r->in.req.req1.options |= DRSUAPI_DS_REPLICA_UPDATE_WRITEABLE;
524         }
525
526         req = dcerpc_drsuapi_DsReplicaUpdateRefs_send(drsuapi->pipe, r, r);
527         composite_continue_rpc(c, req, dreplsrv_update_refs_recv, st);
528 }