s4-drs: Store uSNUrgent for Urgent Replication
[ira/wip.git] / source4 / dsdb / repl / drepl_notify.c
1 /* 
2    Unix SMB/CIFS mplementation.
3
4    DSDB replication service periodic notification handling
5    
6    Copyright (C) Andrew Tridgell 2009
7    based on drepl_periodic
8     
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21    
22 */
23
24 #include "includes.h"
25 #include "lib/events/events.h"
26 #include "dsdb/samdb/samdb.h"
27 #include "auth/auth.h"
28 #include "smbd/service.h"
29 #include "lib/messaging/irpc.h"
30 #include "dsdb/repl/drepl_service.h"
31 #include "lib/ldb/include/ldb_errors.h"
32 #include "../lib/util/dlinklist.h"
33 #include "librpc/gen_ndr/ndr_misc.h"
34 #include "librpc/gen_ndr/ndr_drsuapi.h"
35 #include "librpc/gen_ndr/ndr_drsblobs.h"
36 #include "libcli/composite/composite.h"
37 #include "../lib/util/tevent_ntstatus.h"
38
39
40 struct dreplsrv_op_notify_state {
41         struct dreplsrv_notify_operation *op;
42 };
43
44 static void dreplsrv_op_notify_connect_done(struct tevent_req *subreq);
45
46 /*
47   start the ReplicaSync async call
48  */
49 static struct tevent_req *dreplsrv_op_notify_send(TALLOC_CTX *mem_ctx,
50                                                   struct tevent_context *ev,
51                                                   struct dreplsrv_notify_operation *op)
52 {
53         struct tevent_req *req;
54         struct dreplsrv_op_notify_state *state;
55         struct tevent_req *subreq;
56
57         req = tevent_req_create(mem_ctx, &state,
58                                 struct dreplsrv_op_notify_state);
59         if (req == NULL) {
60                 return NULL;
61         }
62         state->op = op;
63
64         subreq = dreplsrv_out_drsuapi_send(state,
65                                            ev,
66                                            op->source_dsa->conn);
67         if (tevent_req_nomem(subreq, req)) {
68                 return tevent_req_post(req, ev);
69         }
70         tevent_req_set_callback(subreq, dreplsrv_op_notify_connect_done, req);
71
72         return req;
73 }
74
75 static void dreplsrv_op_notify_replica_sync_trigger(struct tevent_req *req);
76
77 static void dreplsrv_op_notify_connect_done(struct tevent_req *subreq)
78 {
79         struct tevent_req *req = tevent_req_callback_data(subreq,
80                                                           struct tevent_req);
81         NTSTATUS status;
82
83         status = dreplsrv_out_drsuapi_recv(subreq);
84         TALLOC_FREE(subreq);
85         if (tevent_req_nterror(req, status)) {
86                 return;
87         }
88
89         dreplsrv_op_notify_replica_sync_trigger(req);
90 }
91
92 static void dreplsrv_op_notify_replica_sync_done(struct rpc_request *rreq);
93
94 static void dreplsrv_op_notify_replica_sync_trigger(struct tevent_req *req)
95 {
96         struct dreplsrv_op_notify_state *state =
97                 tevent_req_data(req,
98                 struct dreplsrv_op_notify_state);
99         struct dreplsrv_partition *partition = state->op->source_dsa->partition;
100         struct dreplsrv_drsuapi_connection *drsuapi = state->op->source_dsa->conn->drsuapi;
101         struct rpc_request *rreq;
102         struct drsuapi_DsReplicaSync *r;
103
104         r = talloc_zero(state, struct drsuapi_DsReplicaSync);
105         if (tevent_req_nomem(r, req)) {
106                 return;
107         }
108         r->in.bind_handle       = &drsuapi->bind_handle;
109         r->in.level = 1;
110         r->in.req.req1.naming_context = &partition->nc;
111         r->in.req.req1.source_dsa_guid = state->op->service->ntds_guid;
112         r->in.req.req1.options = 
113                 DRSUAPI_DS_REPLICA_SYNC_ASYNCHRONOUS_OPERATION |
114                 DRSUAPI_DS_REPLICA_SYNC_WRITEABLE |
115                 DRSUAPI_DS_REPLICA_SYNC_ALL_SOURCES;
116
117         rreq = dcerpc_drsuapi_DsReplicaSync_send(drsuapi->pipe, r, r);
118         if (tevent_req_nomem(rreq, req)) {
119                 return;
120         }
121         composite_continue_rpc(NULL, rreq, dreplsrv_op_notify_replica_sync_done, req);
122 }
123
124 static void dreplsrv_op_notify_replica_sync_done(struct rpc_request *rreq)
125 {
126         struct tevent_req *req = talloc_get_type(rreq->async.private_data,
127                                                  struct tevent_req);
128         struct drsuapi_DsReplicaSync *r = talloc_get_type(rreq->ndr.struct_ptr,
129                                                           struct drsuapi_DsReplicaSync);
130         NTSTATUS status;
131
132         status = dcerpc_ndr_request_recv(rreq);
133         if (tevent_req_nterror(req, status)) {
134                 return;
135         }
136
137         if (!W_ERROR_IS_OK(r->out.result)) {
138                 status = werror_to_ntstatus(r->out.result);
139                 tevent_req_nterror(req, status);
140                 return;
141         }
142
143         tevent_req_done(req);
144 }
145
146 static NTSTATUS dreplsrv_op_notify_recv(struct tevent_req *req)
147 {
148         return tevent_req_simple_recv_ntstatus(req);
149 }
150
151 static void dreplsrv_notify_del_repsTo(struct dreplsrv_notify_operation *op)
152 {
153         uint32_t count;
154         struct repsFromToBlob *reps;
155         WERROR werr;
156         struct dreplsrv_service *s = op->service;
157         int i;
158
159         werr = dsdb_loadreps(s->samdb, op, op->source_dsa->partition->dn, "repsTo", &reps, &count);
160         if (!W_ERROR_IS_OK(werr)) {
161                 DEBUG(0,(__location__ ": Failed to load repsTo for %s\n",
162                          ldb_dn_get_linearized(op->source_dsa->partition->dn)));
163                 return;
164         }
165
166         for (i=0; i<count; i++) {
167                 if (GUID_compare(&reps[i].ctr.ctr1.source_dsa_obj_guid, 
168                                  &op->source_dsa->repsFrom1->source_dsa_obj_guid) == 0) {
169                         memmove(&reps[i], &reps[i+1],
170                                 sizeof(reps[i])*(count-(i+1)));
171                         count--;
172                 }
173         }
174
175         werr = dsdb_savereps(s->samdb, op, op->source_dsa->partition->dn, "repsTo", reps, count);
176         if (!W_ERROR_IS_OK(werr)) {
177                 DEBUG(0,(__location__ ": Failed to save repsTo for %s\n",
178                          ldb_dn_get_linearized(op->source_dsa->partition->dn)));
179                 return;
180         }
181 }
182
183 /*
184   called when a notify operation has completed
185  */
186 static void dreplsrv_notify_op_callback(struct tevent_req *subreq)
187 {
188         struct dreplsrv_notify_operation *op =
189                 tevent_req_callback_data(subreq,
190                 struct dreplsrv_notify_operation);
191         NTSTATUS status;
192         struct dreplsrv_service *s = op->service;
193
194         status = dreplsrv_op_notify_recv(subreq);
195         TALLOC_FREE(subreq);
196         if (!NT_STATUS_IS_OK(status)) {
197                 DEBUG(0,("dreplsrv_notify: Failed to send DsReplicaSync to %s for %s - %s\n",
198                          op->source_dsa->repsFrom1->other_info->dns_name,
199                          ldb_dn_get_linearized(op->source_dsa->partition->dn),
200                          nt_errstr(status)));
201         } else {
202                 DEBUG(2,("dreplsrv_notify: DsReplicaSync OK for %s\n",
203                          op->source_dsa->repsFrom1->other_info->dns_name));
204                 op->source_dsa->notify_uSN = op->uSN;
205                 /* delete the repsTo for this replication partner in the
206                    partition, as we have successfully told him to sync */
207                 dreplsrv_notify_del_repsTo(op);
208         }
209
210         talloc_free(op);
211         s->ops.n_current = NULL;
212         dreplsrv_notify_run_ops(s);
213 }
214
215 /*
216   run any pending replica sync calls
217  */
218 void dreplsrv_notify_run_ops(struct dreplsrv_service *s)
219 {
220         struct dreplsrv_notify_operation *op;
221         struct tevent_req *subreq;
222
223         if (s->ops.n_current || s->ops.current) {
224                 /* if there's still one running, we're done */
225                 return;
226         }
227
228         if (!s->ops.notifies) {
229                 /* if there're no pending operations, we're done */
230                 return;
231         }
232
233         op = s->ops.notifies;
234         s->ops.n_current = op;
235         DLIST_REMOVE(s->ops.notifies, op);
236
237         subreq = dreplsrv_op_notify_send(op, s->task->event_ctx, op);
238         if (!subreq) {
239                 DEBUG(0,("dreplsrv_notify_run_ops: dreplsrv_op_notify_send[%s][%s] - no memory\n",
240                          op->source_dsa->repsFrom1->other_info->dns_name,
241                          ldb_dn_get_linearized(op->source_dsa->partition->dn)));
242                 return;
243         }
244         tevent_req_set_callback(subreq, dreplsrv_notify_op_callback, op);
245 }
246
247
248 /*
249   find a source_dsa for a given guid
250  */
251 static struct dreplsrv_partition_source_dsa *dreplsrv_find_source_dsa(struct dreplsrv_partition *p,
252                                                                       struct GUID *guid)
253 {
254         struct dreplsrv_partition_source_dsa *s;
255
256         for (s=p->sources; s; s=s->next) {
257                 if (GUID_compare(&s->repsFrom1->source_dsa_obj_guid, guid) == 0) {
258                         return s;
259                 }
260         }
261         return NULL;
262 }
263
264
265 /*
266   schedule a replicaSync message
267  */
268 static WERROR dreplsrv_schedule_notify_sync(struct dreplsrv_service *service,
269                                             struct dreplsrv_partition *p,
270                                             struct repsFromToBlob *reps,
271                                             TALLOC_CTX *mem_ctx,
272                                             uint64_t uSN)
273 {
274         struct dreplsrv_notify_operation *op;
275         struct dreplsrv_partition_source_dsa *s;
276
277         s = dreplsrv_find_source_dsa(p, &reps->ctr.ctr1.source_dsa_obj_guid);
278         if (s == NULL) {
279                 DEBUG(0,(__location__ ": Unable to find source_dsa for %s\n",
280                          GUID_string(mem_ctx, &reps->ctr.ctr1.source_dsa_obj_guid)));
281                 return WERR_DS_UNAVAILABLE;
282         }
283
284         op = talloc_zero(mem_ctx, struct dreplsrv_notify_operation);
285         W_ERROR_HAVE_NO_MEMORY(op);
286
287         op->service     = service;
288         op->source_dsa  = s;
289         op->uSN         = uSN;
290
291         DLIST_ADD_END(service->ops.notifies, op, struct dreplsrv_notify_operation *);
292         talloc_steal(service, op);
293         return WERR_OK;
294 }
295
296 /*
297   see if a partition has a hugher uSN than what is in the repsTo and
298   if so then send a DsReplicaSync
299  */
300 static WERROR dreplsrv_notify_check(struct dreplsrv_service *s, 
301                                     struct dreplsrv_partition *p,
302                                     TALLOC_CTX *mem_ctx)
303 {
304         uint32_t count=0;
305         struct repsFromToBlob *reps;
306         WERROR werr;
307         uint64_t uSN;
308         int ret, i;
309
310         werr = dsdb_loadreps(s->samdb, mem_ctx, p->dn, "repsTo", &reps, &count);
311         if (count == 0) {
312                 werr = dsdb_loadreps(s->samdb, mem_ctx, p->dn, "repsFrom", &reps, &count);
313         }
314         if (!W_ERROR_IS_OK(werr)) {
315                 DEBUG(0,(__location__ ": Failed to load repsTo for %s\n",
316                          ldb_dn_get_linearized(p->dn)));
317                 return werr;
318         }
319
320         /* loads the partition uSNHighest */
321         ret = dsdb_load_partition_usn(s->samdb, p->dn, &uSN, NULL);
322         if (ret != LDB_SUCCESS || uSN == 0) {
323                 /* nothing to do */
324                 return WERR_OK;
325         }
326
327         /* see if any of our partners need some of our objects */
328         for (i=0; i<count; i++) {
329                 struct dreplsrv_partition_source_dsa *sdsa;
330                 sdsa = dreplsrv_find_source_dsa(p, &reps[i].ctr.ctr1.source_dsa_obj_guid);
331                 if (sdsa == NULL) continue;
332                 if (sdsa->notify_uSN < uSN) {
333                         /* we need to tell this partner to replicate
334                            with us */
335                         werr = dreplsrv_schedule_notify_sync(s, p, &reps[i], mem_ctx, uSN);
336                         if (!W_ERROR_IS_OK(werr)) {
337                                 DEBUG(0,(__location__ ": Failed to setup notify to %s for %s\n",
338                                          reps[i].ctr.ctr1.other_info->dns_name,
339                                          ldb_dn_get_linearized(p->dn)));
340                                 return werr;
341                         }
342                 }
343         }
344
345         return WERR_OK;
346 }
347
348 /*
349   see if any of the partitions have changed, and if so then send a
350   DsReplicaSync to all the replica partners in the repsTo object
351  */
352 static WERROR dreplsrv_notify_check_all(struct dreplsrv_service *s, TALLOC_CTX *mem_ctx)
353 {
354         WERROR status;
355         struct dreplsrv_partition *p;
356
357         for (p = s->partitions; p; p = p->next) {
358                 status = dreplsrv_notify_check(s, p, mem_ctx);
359                 W_ERROR_NOT_OK_RETURN(status);
360         }
361
362         return WERR_OK;
363 }
364
365 static void dreplsrv_notify_run(struct dreplsrv_service *service);
366
367 static void dreplsrv_notify_handler_te(struct tevent_context *ev, struct tevent_timer *te,
368                                        struct timeval t, void *ptr)
369 {
370         struct dreplsrv_service *service = talloc_get_type(ptr, struct dreplsrv_service);
371         WERROR status;
372
373         service->notify.te = NULL;
374
375         dreplsrv_notify_run(service);
376
377         status = dreplsrv_notify_schedule(service, service->notify.interval);
378         if (!W_ERROR_IS_OK(status)) {
379                 task_server_terminate(service->task, win_errstr(status), false);
380                 return;
381         }
382 }
383
384 WERROR dreplsrv_notify_schedule(struct dreplsrv_service *service, uint32_t next_interval)
385 {
386         TALLOC_CTX *tmp_mem;
387         struct tevent_timer *new_te;
388         struct timeval next_time;
389
390         /* prevent looping */
391         if (next_interval == 0) next_interval = 1;
392
393         next_time = timeval_current_ofs(next_interval, 50);
394
395         if (service->notify.te) {
396                 /*
397                  * if the timestamp of the new event is higher,
398                  * as current next we don't need to reschedule
399                  */
400                 if (timeval_compare(&next_time, &service->notify.next_event) > 0) {
401                         return WERR_OK;
402                 }
403         }
404
405         /* reset the next scheduled timestamp */
406         service->notify.next_event = next_time;
407
408         new_te = event_add_timed(service->task->event_ctx, service,
409                                  service->notify.next_event,
410                                  dreplsrv_notify_handler_te, service);
411         W_ERROR_HAVE_NO_MEMORY(new_te);
412
413         tmp_mem = talloc_new(service);
414         DEBUG(4,("dreplsrv_notify_schedule(%u) %sscheduled for: %s\n",
415                 next_interval,
416                 (service->notify.te?"re":""),
417                 nt_time_string(tmp_mem, timeval_to_nttime(&next_time))));
418         talloc_free(tmp_mem);
419
420         talloc_free(service->notify.te);
421         service->notify.te = new_te;
422
423         return WERR_OK;
424 }
425
426 static void dreplsrv_notify_run(struct dreplsrv_service *service)
427 {
428         TALLOC_CTX *mem_ctx;
429
430         mem_ctx = talloc_new(service);
431         dreplsrv_notify_check_all(service, mem_ctx);
432         talloc_free(mem_ctx);
433
434         dreplsrv_run_pending_ops(service);
435         dreplsrv_notify_run_ops(service);
436 }