53b1467a89ae413074e6df0ffc58ccf026fb5609
[sharpe/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/dlinklist.h"
31
32
33 /* List of SRVID requests that need to be processed */
34 struct srvid_list {
35         struct srvid_list *next, *prev;
36         struct srvid_request *request;
37 };
38
39 struct srvid_requests {
40         struct srvid_list *requests;
41 };
42
43 static void srvid_request_reply(struct ctdb_context *ctdb,
44                                 struct srvid_request *request,
45                                 TDB_DATA result)
46 {
47         /* Someone that sent srvid==0 does not want a reply */
48         if (request->srvid == 0) {
49                 talloc_free(request);
50                 return;
51         }
52
53         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
54                                      result) == 0) {
55                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
56                                   (unsigned)request->pnn,
57                                   (unsigned long long)request->srvid));
58         } else {
59                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
60                                  (unsigned)request->pnn,
61                                  (unsigned long long)request->srvid));
62         }
63
64         talloc_free(request);
65 }
66
67 static void srvid_requests_reply(struct ctdb_context *ctdb,
68                                  struct srvid_requests **requests,
69                                  TDB_DATA result)
70 {
71         struct srvid_list *r;
72
73         for (r = (*requests)->requests; r != NULL; r = r->next) {
74                 srvid_request_reply(ctdb, r->request, result);
75         }
76
77         /* Free the list structure... */
78         TALLOC_FREE(*requests);
79 }
80
81 static void srvid_request_add(struct ctdb_context *ctdb,
82                               struct srvid_requests **requests,
83                               struct srvid_request *request)
84 {
85         struct srvid_list *t;
86         int32_t ret;
87         TDB_DATA result;
88
89         if (*requests == NULL) {
90                 *requests = talloc_zero(ctdb, struct srvid_requests);
91                 if (*requests == NULL) {
92                         goto nomem;
93                 }
94         }
95
96         t = talloc_zero(*requests, struct srvid_list);
97         if (t == NULL) {
98                 /* If *requests was just allocated above then free it */
99                 if ((*requests)->requests == NULL) {
100                         TALLOC_FREE(*requests);
101                 }
102                 goto nomem;
103         }
104
105         t->request = (struct srvid_request *)talloc_steal(t, request);
106         DLIST_ADD((*requests)->requests, t);
107
108         return;
109
110 nomem:
111         /* Failed to add the request to the list.  Send a fail. */
112         DEBUG(DEBUG_ERR, (__location__
113                           " Out of memory, failed to queue SRVID request\n"));
114         ret = -ENOMEM;
115         result.dsize = sizeof(ret);
116         result.dptr = (uint8_t *)&ret;
117         srvid_request_reply(ctdb, request, result);
118 }
119
120 /* An abstraction to allow an operation (takeover runs, recoveries,
121  * ...) to be disabled for a given timeout */
122 struct ctdb_op_state {
123         struct tevent_timer *timer;
124         bool in_progress;
125         const char *name;
126 };
127
128 static struct ctdb_op_state *ctdb_op_init(TALLOC_CTX *mem_ctx, const char *name)
129 {
130         struct ctdb_op_state *state = talloc_zero(mem_ctx, struct ctdb_op_state);
131
132         if (state != NULL) {
133                 state->in_progress = false;
134                 state->name = name;
135         }
136
137         return state;
138 }
139
140 static bool ctdb_op_is_disabled(struct ctdb_op_state *state)
141 {
142         return state->timer != NULL;
143 }
144
145 static bool ctdb_op_begin(struct ctdb_op_state *state)
146 {
147         if (ctdb_op_is_disabled(state)) {
148                 DEBUG(DEBUG_NOTICE,
149                       ("Unable to begin - %s are disabled\n", state->name));
150                 return false;
151         }
152
153         state->in_progress = true;
154         return true;
155 }
156
157 static bool ctdb_op_end(struct ctdb_op_state *state)
158 {
159         return state->in_progress = false;
160 }
161
162 static bool ctdb_op_is_in_progress(struct ctdb_op_state *state)
163 {
164         return state->in_progress;
165 }
166
167 static void ctdb_op_enable(struct ctdb_op_state *state)
168 {
169         TALLOC_FREE(state->timer);
170 }
171
172 static void ctdb_op_timeout_handler(struct event_context *ev,
173                                     struct timed_event *te,
174                                     struct timeval yt, void *p)
175 {
176         struct ctdb_op_state *state =
177                 talloc_get_type(p, struct ctdb_op_state);
178
179         DEBUG(DEBUG_NOTICE,("Reenabling %s after timeout\n", state->name));
180         ctdb_op_enable(state);
181 }
182
183 static int ctdb_op_disable(struct ctdb_op_state *state,
184                            struct tevent_context *ev,
185                            uint32_t timeout)
186 {
187         if (timeout == 0) {
188                 DEBUG(DEBUG_NOTICE,("Reenabling %s\n", state->name));
189                 ctdb_op_enable(state);
190                 return 0;
191         }
192
193         if (state->in_progress) {
194                 DEBUG(DEBUG_ERR,
195                       ("Unable to disable %s - in progress\n", state->name));
196                 return -EAGAIN;
197         }
198
199         DEBUG(DEBUG_NOTICE,("Disabling %s for %u seconds\n",
200                             state->name, timeout));
201
202         /* Clear any old timers */
203         talloc_free(state->timer);
204
205         /* Arrange for the timeout to occur */
206         state->timer = tevent_add_timer(ev, state,
207                                         timeval_current_ofs(timeout, 0),
208                                         ctdb_op_timeout_handler, state);
209         if (state->timer == NULL) {
210                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup timer\n"));
211                 return -ENOMEM;
212         }
213
214         return 0;
215 }
216
217 struct ctdb_banning_state {
218         uint32_t count;
219         struct timeval last_reported_time;
220 };
221
222 /*
223   private state of recovery daemon
224  */
225 struct ctdb_recoverd {
226         struct ctdb_context *ctdb;
227         uint32_t recmaster;
228         uint32_t num_active;
229         uint32_t num_lmasters;
230         uint32_t num_connected;
231         uint32_t last_culprit_node;
232         struct ctdb_node_map *nodemap;
233         struct timeval priority_time;
234         bool need_takeover_run;
235         bool need_recovery;
236         uint32_t node_flags;
237         struct timed_event *send_election_te;
238         struct timed_event *election_timeout;
239         struct vacuum_info *vacuum_info;
240         struct srvid_requests *reallocate_requests;
241         struct ctdb_op_state *takeover_run;
242         struct ctdb_op_state *recovery;
243         struct ctdb_control_get_ifaces *ifaces;
244         uint32_t *force_rebalance_nodes;
245         struct ctdb_node_capabilities *caps;
246 };
247
248 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
249 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
250
251 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
252
253 /*
254   ban a node for a period of time
255  */
256 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
257 {
258         int ret;
259         struct ctdb_context *ctdb = rec->ctdb;
260         struct ctdb_ban_time bantime;
261        
262         if (!ctdb_validate_pnn(ctdb, pnn)) {
263                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
264                 return;
265         }
266
267         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
268
269         bantime.pnn  = pnn;
270         bantime.time = ban_time;
271
272         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
273         if (ret != 0) {
274                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
275                 return;
276         }
277
278 }
279
280 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
281
282
283 /*
284   remember the trouble maker
285  */
286 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
287 {
288         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
289         struct ctdb_banning_state *ban_state;
290
291         if (culprit > ctdb->num_nodes) {
292                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
293                 return;
294         }
295
296         /* If we are banned or stopped, do not set other nodes as culprits */
297         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
298                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
299                 return;
300         }
301
302         if (ctdb->nodes[culprit]->ban_state == NULL) {
303                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
304                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
305
306                 
307         }
308         ban_state = ctdb->nodes[culprit]->ban_state;
309         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
310                 /* this was the first time in a long while this node
311                    misbehaved so we will forgive any old transgressions.
312                 */
313                 ban_state->count = 0;
314         }
315
316         ban_state->count += count;
317         ban_state->last_reported_time = timeval_current();
318         rec->last_culprit_node = culprit;
319 }
320
321 /*
322   remember the trouble maker
323  */
324 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
325 {
326         ctdb_set_culprit_count(rec, culprit, 1);
327 }
328
329
330 /* this callback is called for every node that failed to execute the
331    recovered event
332 */
333 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
334 {
335         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
336
337         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
338
339         ctdb_set_culprit(rec, node_pnn);
340 }
341
342 /*
343   run the "recovered" eventscript on all nodes
344  */
345 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
346 {
347         TALLOC_CTX *tmp_ctx;
348         uint32_t *nodes;
349         struct ctdb_context *ctdb = rec->ctdb;
350
351         tmp_ctx = talloc_new(ctdb);
352         CTDB_NO_MEMORY(ctdb, tmp_ctx);
353
354         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
355         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
356                                         nodes, 0,
357                                         CONTROL_TIMEOUT(), false, tdb_null,
358                                         NULL, recovered_fail_callback,
359                                         rec) != 0) {
360                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
361
362                 talloc_free(tmp_ctx);
363                 return -1;
364         }
365
366         talloc_free(tmp_ctx);
367         return 0;
368 }
369
370 /* this callback is called for every node that failed to execute the
371    start recovery event
372 */
373 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
374 {
375         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
376
377         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
378
379         ctdb_set_culprit(rec, node_pnn);
380 }
381
382 /*
383   run the "startrecovery" eventscript on all nodes
384  */
385 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
386 {
387         TALLOC_CTX *tmp_ctx;
388         uint32_t *nodes;
389         struct ctdb_context *ctdb = rec->ctdb;
390
391         tmp_ctx = talloc_new(ctdb);
392         CTDB_NO_MEMORY(ctdb, tmp_ctx);
393
394         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
395         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
396                                         nodes, 0,
397                                         CONTROL_TIMEOUT(), false, tdb_null,
398                                         NULL,
399                                         startrecovery_fail_callback,
400                                         rec) != 0) {
401                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
402                 talloc_free(tmp_ctx);
403                 return -1;
404         }
405
406         talloc_free(tmp_ctx);
407         return 0;
408 }
409
410 /*
411   update the node capabilities for all connected nodes
412  */
413 static int update_capabilities(struct ctdb_recoverd *rec,
414                                struct ctdb_node_map *nodemap)
415 {
416         uint32_t *capp;
417         TALLOC_CTX *tmp_ctx;
418         struct ctdb_node_capabilities *caps;
419         struct ctdb_context *ctdb = rec->ctdb;
420
421         tmp_ctx = talloc_new(rec);
422         CTDB_NO_MEMORY(ctdb, tmp_ctx);
423
424         caps = ctdb_get_capabilities(ctdb, tmp_ctx,
425                                      CONTROL_TIMEOUT(), nodemap);
426
427         if (caps == NULL) {
428                 DEBUG(DEBUG_ERR,
429                       (__location__ " Failed to get node capabilities\n"));
430                 talloc_free(tmp_ctx);
431                 return -1;
432         }
433
434         capp = ctdb_get_node_capabilities(caps, ctdb_get_pnn(ctdb));
435         if (capp == NULL) {
436                 DEBUG(DEBUG_ERR,
437                       (__location__
438                        " Capabilities don't include current node.\n"));
439                 talloc_free(tmp_ctx);
440                 return -1;
441         }
442         ctdb->capabilities = *capp;
443
444         TALLOC_FREE(rec->caps);
445         rec->caps = talloc_steal(rec, caps);
446
447         talloc_free(tmp_ctx);
448         return 0;
449 }
450
451 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
452 {
453         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
454
455         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
456         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
457 }
458
459 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
460 {
461         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
462
463         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
464         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
465 }
466
467 /*
468   change recovery mode on all nodes
469  */
470 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
471 {
472         TDB_DATA data;
473         uint32_t *nodes;
474         TALLOC_CTX *tmp_ctx;
475
476         tmp_ctx = talloc_new(ctdb);
477         CTDB_NO_MEMORY(ctdb, tmp_ctx);
478
479         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
480
481         data.dsize = sizeof(uint32_t);
482         data.dptr = (unsigned char *)&rec_mode;
483
484         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
485                                         nodes, 0,
486                                         CONTROL_TIMEOUT(),
487                                         false, data,
488                                         NULL, NULL,
489                                         NULL) != 0) {
490                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
491                 talloc_free(tmp_ctx);
492                 return -1;
493         }
494
495         /* freeze all nodes */
496         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
497                 int i;
498
499                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
500                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
501                                                 nodes, i,
502                                                 CONTROL_TIMEOUT(),
503                                                 false, tdb_null,
504                                                 NULL,
505                                                 set_recmode_fail_callback,
506                                                 rec) != 0) {
507                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
508                                 talloc_free(tmp_ctx);
509                                 return -1;
510                         }
511                 }
512         }
513
514         talloc_free(tmp_ctx);
515         return 0;
516 }
517
518 /*
519   change recovery master on all node
520  */
521 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
522 {
523         TDB_DATA data;
524         TALLOC_CTX *tmp_ctx;
525         uint32_t *nodes;
526
527         tmp_ctx = talloc_new(ctdb);
528         CTDB_NO_MEMORY(ctdb, tmp_ctx);
529
530         data.dsize = sizeof(uint32_t);
531         data.dptr = (unsigned char *)&pnn;
532
533         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
534         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
535                                         nodes, 0,
536                                         CONTROL_TIMEOUT(), false, data,
537                                         NULL, NULL,
538                                         NULL) != 0) {
539                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
540                 talloc_free(tmp_ctx);
541                 return -1;
542         }
543
544         talloc_free(tmp_ctx);
545         return 0;
546 }
547
548 /* update all remote nodes to use the same db priority that we have
549    this can fail if the remove node has not yet been upgraded to 
550    support this function, so we always return success and never fail
551    a recovery if this call fails.
552 */
553 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
554         struct ctdb_node_map *nodemap, 
555         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
556 {
557         int db;
558
559         /* step through all local databases */
560         for (db=0; db<dbmap->num;db++) {
561                 struct ctdb_db_priority db_prio;
562                 int ret;
563
564                 db_prio.db_id     = dbmap->dbs[db].dbid;
565                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
566                 if (ret != 0) {
567                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
568                         continue;
569                 }
570
571                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
572
573                 ret = ctdb_ctrl_set_db_priority(ctdb, CONTROL_TIMEOUT(),
574                                                 CTDB_CURRENT_NODE, &db_prio);
575                 if (ret != 0) {
576                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n",
577                                          db_prio.db_id));
578                 }
579         }
580
581         return 0;
582 }                       
583
584 /*
585   ensure all other nodes have attached to any databases that we have
586  */
587 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
588                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
589 {
590         int i, j, db, ret;
591         struct ctdb_dbid_map *remote_dbmap;
592
593         /* verify that all other nodes have all our databases */
594         for (j=0; j<nodemap->num; j++) {
595                 /* we dont need to ourself ourselves */
596                 if (nodemap->nodes[j].pnn == pnn) {
597                         continue;
598                 }
599                 /* dont check nodes that are unavailable */
600                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
601                         continue;
602                 }
603
604                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
605                                          mem_ctx, &remote_dbmap);
606                 if (ret != 0) {
607                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
608                         return -1;
609                 }
610
611                 /* step through all local databases */
612                 for (db=0; db<dbmap->num;db++) {
613                         const char *name;
614
615
616                         for (i=0;i<remote_dbmap->num;i++) {
617                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
618                                         break;
619                                 }
620                         }
621                         /* the remote node already have this database */
622                         if (i!=remote_dbmap->num) {
623                                 continue;
624                         }
625                         /* ok so we need to create this database */
626                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
627                                                   dbmap->dbs[db].dbid, mem_ctx,
628                                                   &name);
629                         if (ret != 0) {
630                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
631                                 return -1;
632                         }
633                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
634                                                  nodemap->nodes[j].pnn,
635                                                  mem_ctx, name,
636                                                  dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
637                         if (ret != 0) {
638                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
639                                 return -1;
640                         }
641                 }
642         }
643
644         return 0;
645 }
646
647
648 /*
649   ensure we are attached to any databases that anyone else is attached to
650  */
651 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
652                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
653 {
654         int i, j, db, ret;
655         struct ctdb_dbid_map *remote_dbmap;
656
657         /* verify that we have all database any other node has */
658         for (j=0; j<nodemap->num; j++) {
659                 /* we dont need to ourself ourselves */
660                 if (nodemap->nodes[j].pnn == pnn) {
661                         continue;
662                 }
663                 /* dont check nodes that are unavailable */
664                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
665                         continue;
666                 }
667
668                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
669                                          mem_ctx, &remote_dbmap);
670                 if (ret != 0) {
671                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
672                         return -1;
673                 }
674
675                 /* step through all databases on the remote node */
676                 for (db=0; db<remote_dbmap->num;db++) {
677                         const char *name;
678
679                         for (i=0;i<(*dbmap)->num;i++) {
680                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
681                                         break;
682                                 }
683                         }
684                         /* we already have this db locally */
685                         if (i!=(*dbmap)->num) {
686                                 continue;
687                         }
688                         /* ok so we need to create this database and
689                            rebuild dbmap
690                          */
691                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
692                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
693                         if (ret != 0) {
694                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
695                                           nodemap->nodes[j].pnn));
696                                 return -1;
697                         }
698                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
699                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
700                         if (ret != 0) {
701                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
702                                 return -1;
703                         }
704                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
705                         if (ret != 0) {
706                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
707                                 return -1;
708                         }
709                 }
710         }
711
712         return 0;
713 }
714
715
716 /*
717   pull the remote database contents from one node into the recdb
718  */
719 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
720                                     struct tdb_wrap *recdb, uint32_t dbid)
721 {
722         int ret;
723         TDB_DATA outdata;
724         struct ctdb_marshall_buffer *reply;
725         struct ctdb_rec_data *rec;
726         int i;
727         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
728
729         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
730                                CONTROL_TIMEOUT(), &outdata);
731         if (ret != 0) {
732                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
733                 talloc_free(tmp_ctx);
734                 return -1;
735         }
736
737         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
738
739         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
740                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
741                 talloc_free(tmp_ctx);
742                 return -1;
743         }
744         
745         rec = (struct ctdb_rec_data *)&reply->data[0];
746         
747         for (i=0;
748              i<reply->count;
749              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
750                 TDB_DATA key, data;
751                 struct ctdb_ltdb_header *hdr;
752                 TDB_DATA existing;
753                 
754                 key.dptr = &rec->data[0];
755                 key.dsize = rec->keylen;
756                 data.dptr = &rec->data[key.dsize];
757                 data.dsize = rec->datalen;
758                 
759                 hdr = (struct ctdb_ltdb_header *)data.dptr;
760
761                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
762                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
763                         talloc_free(tmp_ctx);
764                         return -1;
765                 }
766
767                 /* fetch the existing record, if any */
768                 existing = tdb_fetch(recdb->tdb, key);
769                 
770                 if (existing.dptr != NULL) {
771                         struct ctdb_ltdb_header header;
772                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
773                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
774                                          (unsigned)existing.dsize, srcnode));
775                                 free(existing.dptr);
776                                 talloc_free(tmp_ctx);
777                                 return -1;
778                         }
779                         header = *(struct ctdb_ltdb_header *)existing.dptr;
780                         free(existing.dptr);
781                         if (!(header.rsn < hdr->rsn ||
782                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
783                                 continue;
784                         }
785                 }
786                 
787                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
788                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
789                         talloc_free(tmp_ctx);
790                         return -1;                              
791                 }
792         }
793
794         talloc_free(tmp_ctx);
795
796         return 0;
797 }
798
799
800 struct pull_seqnum_cbdata {
801         int failed;
802         uint32_t pnn;
803         uint64_t seqnum;
804 };
805
806 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
807 {
808         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
809         uint64_t seqnum;
810
811         if (cb_data->failed != 0) {
812                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
813                 return;
814         }
815
816         if (res != 0) {
817                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
818                 cb_data->failed = 1;
819                 return;
820         }
821
822         if (outdata.dsize != sizeof(uint64_t)) {
823                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
824                 cb_data->failed = -1;
825                 return;
826         }
827
828         seqnum = *((uint64_t *)outdata.dptr);
829
830         if (seqnum > cb_data->seqnum ||
831             (cb_data->pnn == -1 && seqnum == 0)) {
832                 cb_data->seqnum = seqnum;
833                 cb_data->pnn = node_pnn;
834         }
835 }
836
837 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
838 {
839         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
840
841         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
842         cb_data->failed = 1;
843 }
844
845 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
846                                 struct ctdb_recoverd *rec, 
847                                 struct ctdb_node_map *nodemap, 
848                                 struct tdb_wrap *recdb, uint32_t dbid)
849 {
850         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
851         uint32_t *nodes;
852         TDB_DATA data;
853         uint32_t outdata[2];
854         struct pull_seqnum_cbdata *cb_data;
855
856         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
857
858         outdata[0] = dbid;
859         outdata[1] = 0;
860
861         data.dsize = sizeof(outdata);
862         data.dptr  = (uint8_t *)&outdata[0];
863
864         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
865         if (cb_data == NULL) {
866                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
867                 talloc_free(tmp_ctx);
868                 return -1;
869         }
870
871         cb_data->failed = 0;
872         cb_data->pnn    = -1;
873         cb_data->seqnum = 0;
874         
875         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
876         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
877                                         nodes, 0,
878                                         CONTROL_TIMEOUT(), false, data,
879                                         pull_seqnum_cb,
880                                         pull_seqnum_fail_cb,
881                                         cb_data) != 0) {
882                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
883
884                 talloc_free(tmp_ctx);
885                 return -1;
886         }
887
888         if (cb_data->failed != 0) {
889                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
890                 talloc_free(tmp_ctx);
891                 return -1;
892         }
893
894         if (cb_data->pnn == -1) {
895                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
896                 talloc_free(tmp_ctx);
897                 return -1;
898         }
899
900         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
901
902         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
903                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
904                 talloc_free(tmp_ctx);
905                 return -1;
906         }
907
908         talloc_free(tmp_ctx);
909         return 0;
910 }
911
912
913 /*
914   pull all the remote database contents into the recdb
915  */
916 static int pull_remote_database(struct ctdb_context *ctdb,
917                                 struct ctdb_recoverd *rec, 
918                                 struct ctdb_node_map *nodemap, 
919                                 struct tdb_wrap *recdb, uint32_t dbid,
920                                 bool persistent)
921 {
922         int j;
923
924         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
925                 int ret;
926                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
927                 if (ret == 0) {
928                         return 0;
929                 }
930         }
931
932         /* pull all records from all other nodes across onto this node
933            (this merges based on rsn)
934         */
935         for (j=0; j<nodemap->num; j++) {
936                 /* dont merge from nodes that are unavailable */
937                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
938                         continue;
939                 }
940                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
941                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
942                                  nodemap->nodes[j].pnn));
943                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
944                         return -1;
945                 }
946         }
947         
948         return 0;
949 }
950
951
952 /*
953   update flags on all active nodes
954  */
955 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
956 {
957         int ret;
958
959         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
960                 if (ret != 0) {
961                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
962                 return -1;
963         }
964
965         return 0;
966 }
967
968 /*
969   ensure all nodes have the same vnnmap we do
970  */
971 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
972                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
973 {
974         int j, ret;
975
976         /* push the new vnn map out to all the nodes */
977         for (j=0; j<nodemap->num; j++) {
978                 /* dont push to nodes that are unavailable */
979                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
980                         continue;
981                 }
982
983                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
984                 if (ret != 0) {
985                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
986                         return -1;
987                 }
988         }
989
990         return 0;
991 }
992
993
994 struct vacuum_info {
995         struct vacuum_info *next, *prev;
996         struct ctdb_recoverd *rec;
997         uint32_t srcnode;
998         struct ctdb_db_context *ctdb_db;
999         struct ctdb_marshall_buffer *recs;
1000         struct ctdb_rec_data *r;
1001 };
1002
1003 static void vacuum_fetch_next(struct vacuum_info *v);
1004
1005 /*
1006   called when a vacuum fetch has completed - just free it and do the next one
1007  */
1008 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
1009 {
1010         talloc_free(state);
1011 }
1012
1013
1014 /*
1015   process the next element from the vacuum list
1016 */
1017 static void vacuum_fetch_next(struct vacuum_info *v)
1018 {
1019         struct ctdb_call call;
1020         struct ctdb_rec_data *r;
1021
1022         while (v->recs->count) {
1023                 struct ctdb_client_call_state *state;
1024                 TDB_DATA data;
1025                 struct ctdb_ltdb_header *hdr;
1026
1027                 ZERO_STRUCT(call);
1028                 call.call_id = CTDB_NULL_FUNC;
1029                 call.flags = CTDB_IMMEDIATE_MIGRATION;
1030                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
1031
1032                 r = v->r;
1033                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
1034                 v->recs->count--;
1035
1036                 call.key.dptr = &r->data[0];
1037                 call.key.dsize = r->keylen;
1038
1039                 /* ensure we don't block this daemon - just skip a record if we can't get
1040                    the chainlock */
1041                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
1042                         continue;
1043                 }
1044
1045                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
1046                 if (data.dptr == NULL) {
1047                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
1048                         continue;
1049                 }
1050
1051                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1052                         free(data.dptr);
1053                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
1054                         continue;
1055                 }
1056                 
1057                 hdr = (struct ctdb_ltdb_header *)data.dptr;
1058                 if (hdr->dmaster == v->rec->ctdb->pnn) {
1059                         /* its already local */
1060                         free(data.dptr);
1061                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
1062                         continue;
1063                 }
1064
1065                 free(data.dptr);
1066
1067                 state = ctdb_call_send(v->ctdb_db, &call);
1068                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
1069                 if (state == NULL) {
1070                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
1071                         talloc_free(v);
1072                         return;
1073                 }
1074                 state->async.fn = vacuum_fetch_callback;
1075                 state->async.private_data = NULL;
1076         }
1077
1078         talloc_free(v);
1079 }
1080
1081
1082 /*
1083   destroy a vacuum info structure
1084  */
1085 static int vacuum_info_destructor(struct vacuum_info *v)
1086 {
1087         DLIST_REMOVE(v->rec->vacuum_info, v);
1088         return 0;
1089 }
1090
1091
1092 /*
1093   handler for vacuum fetch
1094 */
1095 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1096                                  TDB_DATA data, void *private_data)
1097 {
1098         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1099         struct ctdb_marshall_buffer *recs;
1100         int ret, i;
1101         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1102         const char *name;
1103         struct ctdb_dbid_map *dbmap=NULL;
1104         bool persistent = false;
1105         struct ctdb_db_context *ctdb_db;
1106         struct ctdb_rec_data *r;
1107         uint32_t srcnode;
1108         struct vacuum_info *v;
1109
1110         recs = (struct ctdb_marshall_buffer *)data.dptr;
1111         r = (struct ctdb_rec_data *)&recs->data[0];
1112
1113         if (recs->count == 0) {
1114                 talloc_free(tmp_ctx);
1115                 return;
1116         }
1117
1118         srcnode = r->reqid;
1119
1120         for (v=rec->vacuum_info;v;v=v->next) {
1121                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
1122                         /* we're already working on records from this node */
1123                         talloc_free(tmp_ctx);
1124                         return;
1125                 }
1126         }
1127
1128         /* work out if the database is persistent */
1129         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
1130         if (ret != 0) {
1131                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
1132                 talloc_free(tmp_ctx);
1133                 return;
1134         }
1135
1136         for (i=0;i<dbmap->num;i++) {
1137                 if (dbmap->dbs[i].dbid == recs->db_id) {
1138                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1139                         break;
1140                 }
1141         }
1142         if (i == dbmap->num) {
1143                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
1144                 talloc_free(tmp_ctx);
1145                 return;         
1146         }
1147
1148         /* find the name of this database */
1149         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
1150                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
1151                 talloc_free(tmp_ctx);
1152                 return;
1153         }
1154
1155         /* attach to it */
1156         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
1157         if (ctdb_db == NULL) {
1158                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
1159                 talloc_free(tmp_ctx);
1160                 return;
1161         }
1162
1163         v = talloc_zero(rec, struct vacuum_info);
1164         if (v == NULL) {
1165                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1166                 talloc_free(tmp_ctx);
1167                 return;
1168         }
1169
1170         v->rec = rec;
1171         v->srcnode = srcnode;
1172         v->ctdb_db = ctdb_db;
1173         v->recs = talloc_memdup(v, recs, data.dsize);
1174         if (v->recs == NULL) {
1175                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1176                 talloc_free(v);
1177                 talloc_free(tmp_ctx);
1178                 return;         
1179         }
1180         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
1181
1182         DLIST_ADD(rec->vacuum_info, v);
1183
1184         talloc_set_destructor(v, vacuum_info_destructor);
1185
1186         vacuum_fetch_next(v);
1187         talloc_free(tmp_ctx);
1188 }
1189
1190
1191 /*
1192  * handler for database detach
1193  */
1194 static void detach_database_handler(struct ctdb_context *ctdb, uint64_t srvid,
1195                                     TDB_DATA data, void *private_data)
1196 {
1197         struct ctdb_recoverd *rec = talloc_get_type(private_data,
1198                                                     struct ctdb_recoverd);
1199         uint32_t db_id;
1200         struct vacuum_info *v, *vnext;
1201         struct ctdb_db_context *ctdb_db;
1202
1203         if (data.dsize != sizeof(db_id)) {
1204                 return;
1205         }
1206         db_id = *(uint32_t *)data.dptr;
1207
1208         ctdb_db = find_ctdb_db(ctdb, db_id);
1209         if (ctdb_db == NULL) {
1210                 /* database is not attached */
1211                 return;
1212         }
1213
1214         /* Stop any active vacuum fetch */
1215         v = rec->vacuum_info;
1216         while (v != NULL) {
1217                 vnext = v->next;
1218
1219                 if (v->ctdb_db->db_id == db_id) {
1220                         talloc_free(v);
1221                 }
1222                 v = vnext;
1223         }
1224
1225         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1226
1227         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1228                              ctdb_db->db_name));
1229         talloc_free(ctdb_db);
1230 }
1231
1232 /*
1233   called when ctdb_wait_timeout should finish
1234  */
1235 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1236                               struct timeval yt, void *p)
1237 {
1238         uint32_t *timed_out = (uint32_t *)p;
1239         (*timed_out) = 1;
1240 }
1241
1242 /*
1243   wait for a given number of seconds
1244  */
1245 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1246 {
1247         uint32_t timed_out = 0;
1248         time_t usecs = (secs - (time_t)secs) * 1000000;
1249         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1250         while (!timed_out) {
1251                 event_loop_once(ctdb->ev);
1252         }
1253 }
1254
1255 /*
1256   called when an election times out (ends)
1257  */
1258 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1259                                   struct timeval t, void *p)
1260 {
1261         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1262         rec->election_timeout = NULL;
1263         fast_start = false;
1264
1265         DEBUG(DEBUG_WARNING,("Election period ended\n"));
1266 }
1267
1268
1269 /*
1270   wait for an election to finish. It finished election_timeout seconds after
1271   the last election packet is received
1272  */
1273 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1274 {
1275         struct ctdb_context *ctdb = rec->ctdb;
1276         while (rec->election_timeout) {
1277                 event_loop_once(ctdb->ev);
1278         }
1279 }
1280
1281 /*
1282   Update our local flags from all remote connected nodes. 
1283   This is only run when we are or we belive we are the recovery master
1284  */
1285 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1286 {
1287         int j;
1288         struct ctdb_context *ctdb = rec->ctdb;
1289         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1290
1291         /* get the nodemap for all active remote nodes and verify
1292            they are the same as for this node
1293          */
1294         for (j=0; j<nodemap->num; j++) {
1295                 struct ctdb_node_map *remote_nodemap=NULL;
1296                 int ret;
1297
1298                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1299                         continue;
1300                 }
1301                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1302                         continue;
1303                 }
1304
1305                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1306                                            mem_ctx, &remote_nodemap);
1307                 if (ret != 0) {
1308                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1309                                   nodemap->nodes[j].pnn));
1310                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1311                         talloc_free(mem_ctx);
1312                         return MONITOR_FAILED;
1313                 }
1314                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1315                         /* We should tell our daemon about this so it
1316                            updates its flags or else we will log the same 
1317                            message again in the next iteration of recovery.
1318                            Since we are the recovery master we can just as
1319                            well update the flags on all nodes.
1320                         */
1321                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1322                         if (ret != 0) {
1323                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1324                                 return -1;
1325                         }
1326
1327                         /* Update our local copy of the flags in the recovery
1328                            daemon.
1329                         */
1330                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1331                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1332                                  nodemap->nodes[j].flags));
1333                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1334                 }
1335                 talloc_free(remote_nodemap);
1336         }
1337         talloc_free(mem_ctx);
1338         return MONITOR_OK;
1339 }
1340
1341
1342 /* Create a new random generation ip. 
1343    The generation id can not be the INVALID_GENERATION id
1344 */
1345 static uint32_t new_generation(void)
1346 {
1347         uint32_t generation;
1348
1349         while (1) {
1350                 generation = random();
1351
1352                 if (generation != INVALID_GENERATION) {
1353                         break;
1354                 }
1355         }
1356
1357         return generation;
1358 }
1359
1360
1361 /*
1362   create a temporary working database
1363  */
1364 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1365 {
1366         char *name;
1367         struct tdb_wrap *recdb;
1368         unsigned tdb_flags;
1369
1370         /* open up the temporary recovery database */
1371         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1372                                ctdb->db_directory_state,
1373                                ctdb->pnn);
1374         if (name == NULL) {
1375                 return NULL;
1376         }
1377         unlink(name);
1378
1379         tdb_flags = TDB_NOLOCK;
1380         if (ctdb->valgrinding) {
1381                 tdb_flags |= TDB_NOMMAP;
1382         }
1383         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1384
1385         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1386                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1387         if (recdb == NULL) {
1388                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1389         }
1390
1391         talloc_free(name);
1392
1393         return recdb;
1394 }
1395
1396
1397 /* 
1398    a traverse function for pulling all relevant records from recdb
1399  */
1400 struct recdb_data {
1401         struct ctdb_context *ctdb;
1402         struct ctdb_marshall_buffer *recdata;
1403         uint32_t len;
1404         uint32_t allocated_len;
1405         bool failed;
1406         bool persistent;
1407 };
1408
1409 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1410 {
1411         struct recdb_data *params = (struct recdb_data *)p;
1412         struct ctdb_rec_data *rec;
1413         struct ctdb_ltdb_header *hdr;
1414
1415         /*
1416          * skip empty records - but NOT for persistent databases:
1417          *
1418          * The record-by-record mode of recovery deletes empty records.
1419          * For persistent databases, this can lead to data corruption
1420          * by deleting records that should be there:
1421          *
1422          * - Assume the cluster has been running for a while.
1423          *
1424          * - A record R in a persistent database has been created and
1425          *   deleted a couple of times, the last operation being deletion,
1426          *   leaving an empty record with a high RSN, say 10.
1427          *
1428          * - Now a node N is turned off.
1429          *
1430          * - This leaves the local database copy of D on N with the empty
1431          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1432          *   the copy of record R.
1433          *
1434          * - Now the record is created again while node N is turned off.
1435          *   This creates R with RSN = 1 on all nodes except for N.
1436          *
1437          * - Now node N is turned on again. The following recovery will chose
1438          *   the older empty copy of R due to RSN 10 > RSN 1.
1439          *
1440          * ==> Hence the record is gone after the recovery.
1441          *
1442          * On databases like Samba's registry, this can damage the higher-level
1443          * data structures built from the various tdb-level records.
1444          */
1445         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1446                 return 0;
1447         }
1448
1449         /* update the dmaster field to point to us */
1450         hdr = (struct ctdb_ltdb_header *)data.dptr;
1451         if (!params->persistent) {
1452                 hdr->dmaster = params->ctdb->pnn;
1453                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1454         }
1455
1456         /* add the record to the blob ready to send to the nodes */
1457         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1458         if (rec == NULL) {
1459                 params->failed = true;
1460                 return -1;
1461         }
1462         if (params->len + rec->length >= params->allocated_len) {
1463                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1464                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1465         }
1466         if (params->recdata == NULL) {
1467                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1468                          rec->length + params->len));
1469                 params->failed = true;
1470                 return -1;
1471         }
1472         params->recdata->count++;
1473         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1474         params->len += rec->length;
1475         talloc_free(rec);
1476
1477         return 0;
1478 }
1479
1480 /*
1481   push the recdb database out to all nodes
1482  */
1483 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1484                                bool persistent,
1485                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1486 {
1487         struct recdb_data params;
1488         struct ctdb_marshall_buffer *recdata;
1489         TDB_DATA outdata;
1490         TALLOC_CTX *tmp_ctx;
1491         uint32_t *nodes;
1492
1493         tmp_ctx = talloc_new(ctdb);
1494         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1495
1496         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1497         CTDB_NO_MEMORY(ctdb, recdata);
1498
1499         recdata->db_id = dbid;
1500
1501         params.ctdb = ctdb;
1502         params.recdata = recdata;
1503         params.len = offsetof(struct ctdb_marshall_buffer, data);
1504         params.allocated_len = params.len;
1505         params.failed = false;
1506         params.persistent = persistent;
1507
1508         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1509                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1510                 talloc_free(params.recdata);
1511                 talloc_free(tmp_ctx);
1512                 return -1;
1513         }
1514
1515         if (params.failed) {
1516                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1517                 talloc_free(params.recdata);
1518                 talloc_free(tmp_ctx);
1519                 return -1;              
1520         }
1521
1522         recdata = params.recdata;
1523
1524         outdata.dptr = (void *)recdata;
1525         outdata.dsize = params.len;
1526
1527         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1528         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1529                                         nodes, 0,
1530                                         CONTROL_TIMEOUT(), false, outdata,
1531                                         NULL, NULL,
1532                                         NULL) != 0) {
1533                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1534                 talloc_free(recdata);
1535                 talloc_free(tmp_ctx);
1536                 return -1;
1537         }
1538
1539         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1540                   dbid, recdata->count));
1541
1542         talloc_free(recdata);
1543         talloc_free(tmp_ctx);
1544
1545         return 0;
1546 }
1547
1548
1549 /*
1550   go through a full recovery on one database 
1551  */
1552 static int recover_database(struct ctdb_recoverd *rec, 
1553                             TALLOC_CTX *mem_ctx,
1554                             uint32_t dbid,
1555                             bool persistent,
1556                             uint32_t pnn, 
1557                             struct ctdb_node_map *nodemap,
1558                             uint32_t transaction_id)
1559 {
1560         struct tdb_wrap *recdb;
1561         int ret;
1562         struct ctdb_context *ctdb = rec->ctdb;
1563         TDB_DATA data;
1564         struct ctdb_control_wipe_database w;
1565         uint32_t *nodes;
1566
1567         recdb = create_recdb(ctdb, mem_ctx);
1568         if (recdb == NULL) {
1569                 return -1;
1570         }
1571
1572         /* pull all remote databases onto the recdb */
1573         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1574         if (ret != 0) {
1575                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1576                 return -1;
1577         }
1578
1579         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1580
1581         /* wipe all the remote databases. This is safe as we are in a transaction */
1582         w.db_id = dbid;
1583         w.transaction_id = transaction_id;
1584
1585         data.dptr = (void *)&w;
1586         data.dsize = sizeof(w);
1587
1588         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1589         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1590                                         nodes, 0,
1591                                         CONTROL_TIMEOUT(), false, data,
1592                                         NULL, NULL,
1593                                         NULL) != 0) {
1594                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1595                 talloc_free(recdb);
1596                 return -1;
1597         }
1598         
1599         /* push out the correct database. This sets the dmaster and skips 
1600            the empty records */
1601         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1602         if (ret != 0) {
1603                 talloc_free(recdb);
1604                 return -1;
1605         }
1606
1607         /* all done with this database */
1608         talloc_free(recdb);
1609
1610         return 0;
1611 }
1612
1613 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1614                                          struct ctdb_recoverd *rec,
1615                                          struct ctdb_node_map *nodemap,
1616                                          uint32_t *culprit)
1617 {
1618         int j;
1619         int ret;
1620
1621         if (ctdb->num_nodes != nodemap->num) {
1622                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1623                                   ctdb->num_nodes, nodemap->num));
1624                 if (culprit) {
1625                         *culprit = ctdb->pnn;
1626                 }
1627                 return -1;
1628         }
1629
1630         for (j=0; j<nodemap->num; j++) {
1631                 /* For readability */
1632                 struct ctdb_node *node = ctdb->nodes[j];
1633
1634                 /* release any existing data */
1635                 if (node->known_public_ips) {
1636                         talloc_free(node->known_public_ips);
1637                         node->known_public_ips = NULL;
1638                 }
1639                 if (node->available_public_ips) {
1640                         talloc_free(node->available_public_ips);
1641                         node->available_public_ips = NULL;
1642                 }
1643
1644                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1645                         continue;
1646                 }
1647
1648                 /* Retrieve the list of known public IPs from the node */
1649                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1650                                         CONTROL_TIMEOUT(),
1651                                         node->pnn,
1652                                         ctdb->nodes,
1653                                         0,
1654                                         &node->known_public_ips);
1655                 if (ret != 0) {
1656                         DEBUG(DEBUG_ERR,
1657                               ("Failed to read known public IPs from node: %u\n",
1658                                node->pnn));
1659                         if (culprit) {
1660                                 *culprit = node->pnn;
1661                         }
1662                         return -1;
1663                 }
1664
1665                 if (ctdb->do_checkpublicip &&
1666                     !ctdb_op_is_disabled(rec->takeover_run) &&
1667                     verify_remote_ip_allocation(ctdb,
1668                                                  node->known_public_ips,
1669                                                  node->pnn)) {
1670                         DEBUG(DEBUG_ERR,("Trigger IP reallocation\n"));
1671                         rec->need_takeover_run = true;
1672                 }
1673
1674                 /* Retrieve the list of available public IPs from the node */
1675                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1676                                         CONTROL_TIMEOUT(),
1677                                         node->pnn,
1678                                         ctdb->nodes,
1679                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1680                                         &node->available_public_ips);
1681                 if (ret != 0) {
1682                         DEBUG(DEBUG_ERR,
1683                               ("Failed to read available public IPs from node: %u\n",
1684                                node->pnn));
1685                         if (culprit) {
1686                                 *culprit = node->pnn;
1687                         }
1688                         return -1;
1689                 }
1690         }
1691
1692         return 0;
1693 }
1694
1695 /* when we start a recovery, make sure all nodes use the same reclock file
1696    setting
1697 */
1698 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1699 {
1700         struct ctdb_context *ctdb = rec->ctdb;
1701         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1702         TDB_DATA data;
1703         uint32_t *nodes;
1704
1705         if (ctdb->recovery_lock_file == NULL) {
1706                 data.dptr  = NULL;
1707                 data.dsize = 0;
1708         } else {
1709                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1710                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1711         }
1712
1713         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1714         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1715                                         nodes, 0,
1716                                         CONTROL_TIMEOUT(),
1717                                         false, data,
1718                                         NULL, NULL,
1719                                         rec) != 0) {
1720                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1721                 talloc_free(tmp_ctx);
1722                 return -1;
1723         }
1724
1725         talloc_free(tmp_ctx);
1726         return 0;
1727 }
1728
1729
1730 /*
1731  * this callback is called for every node that failed to execute ctdb_takeover_run()
1732  * and set flag to re-run takeover run.
1733  */
1734 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1735 {
1736         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1737
1738         if (callback_data != NULL) {
1739                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1740
1741                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1742
1743                 ctdb_set_culprit(rec, node_pnn);
1744         }
1745 }
1746
1747
1748 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1749 {
1750         struct ctdb_context *ctdb = rec->ctdb;
1751         int i;
1752         struct ctdb_banning_state *ban_state;
1753
1754         *self_ban = false;
1755         for (i=0; i<ctdb->num_nodes; i++) {
1756                 if (ctdb->nodes[i]->ban_state == NULL) {
1757                         continue;
1758                 }
1759                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1760                 if (ban_state->count < 2*ctdb->num_nodes) {
1761                         continue;
1762                 }
1763
1764                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1765                         ctdb->nodes[i]->pnn, ban_state->count,
1766                         ctdb->tunable.recovery_ban_period));
1767                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1768                 ban_state->count = 0;
1769
1770                 /* Banning ourself? */
1771                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1772                         *self_ban = true;
1773                 }
1774         }
1775 }
1776
1777 static bool do_takeover_run(struct ctdb_recoverd *rec,
1778                             struct ctdb_node_map *nodemap,
1779                             bool banning_credits_on_fail)
1780 {
1781         uint32_t *nodes = NULL;
1782         struct srvid_request_data dtr;
1783         TDB_DATA data;
1784         int i;
1785         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1786         int ret;
1787         bool ok;
1788
1789         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1790
1791         if (ctdb_op_is_in_progress(rec->takeover_run)) {
1792                 DEBUG(DEBUG_ERR, (__location__
1793                                   " takeover run already in progress \n"));
1794                 ok = false;
1795                 goto done;
1796         }
1797
1798         if (!ctdb_op_begin(rec->takeover_run)) {
1799                 ok = false;
1800                 goto done;
1801         }
1802
1803         /* Disable IP checks (takeover runs, really) on other nodes
1804          * while doing this takeover run.  This will stop those other
1805          * nodes from triggering takeover runs when think they should
1806          * be hosting an IP but it isn't yet on an interface.  Don't
1807          * wait for replies since a failure here might cause some
1808          * noise in the logs but will not actually cause a problem.
1809          */
1810         dtr.srvid = 0; /* No reply */
1811         dtr.pnn = -1;
1812
1813         data.dptr  = (uint8_t*)&dtr;
1814         data.dsize = sizeof(dtr);
1815
1816         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1817
1818         /* Disable for 60 seconds.  This can be a tunable later if
1819          * necessary.
1820          */
1821         dtr.data = 60;
1822         for (i = 0; i < talloc_array_length(nodes); i++) {
1823                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1824                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1825                                              data) != 0) {
1826                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1827                 }
1828         }
1829
1830         ret = ctdb_takeover_run(rec->ctdb, nodemap,
1831                                 rec->force_rebalance_nodes,
1832                                 takeover_fail_callback,
1833                                 banning_credits_on_fail ? rec : NULL);
1834
1835         /* Reenable takeover runs and IP checks on other nodes */
1836         dtr.data = 0;
1837         for (i = 0; i < talloc_array_length(nodes); i++) {
1838                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1839                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1840                                              data) != 0) {
1841                         DEBUG(DEBUG_INFO,("Failed to reenable takeover runs\n"));
1842                 }
1843         }
1844
1845         if (ret != 0) {
1846                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1847                 ok = false;
1848                 goto done;
1849         }
1850
1851         ok = true;
1852         /* Takeover run was successful so clear force rebalance targets */
1853         if (rebalance_nodes == rec->force_rebalance_nodes) {
1854                 TALLOC_FREE(rec->force_rebalance_nodes);
1855         } else {
1856                 DEBUG(DEBUG_WARNING,
1857                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1858         }
1859 done:
1860         rec->need_takeover_run = !ok;
1861         talloc_free(nodes);
1862         ctdb_op_end(rec->takeover_run);
1863
1864         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1865         return ok;
1866 }
1867
1868
1869 /*
1870   we are the recmaster, and recovery is needed - start a recovery run
1871  */
1872 static int do_recovery(struct ctdb_recoverd *rec, 
1873                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1874                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1875 {
1876         struct ctdb_context *ctdb = rec->ctdb;
1877         int i, j, ret;
1878         uint32_t generation;
1879         struct ctdb_dbid_map *dbmap;
1880         TDB_DATA data;
1881         uint32_t *nodes;
1882         struct timeval start_time;
1883         uint32_t culprit = (uint32_t)-1;
1884         bool self_ban;
1885
1886         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1887
1888         /* if recovery fails, force it again */
1889         rec->need_recovery = true;
1890
1891         if (!ctdb_op_begin(rec->recovery)) {
1892                 return -1;
1893         }
1894
1895         if (rec->election_timeout) {
1896                 /* an election is in progress */
1897                 DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
1898                 goto fail;
1899         }
1900
1901         ban_misbehaving_nodes(rec, &self_ban);
1902         if (self_ban) {
1903                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1904                 goto fail;
1905         }
1906
1907         if (ctdb->recovery_lock_file != NULL) {
1908                 if (ctdb_recovery_have_lock(ctdb)) {
1909                         DEBUG(DEBUG_NOTICE, ("Already holding recovery lock\n"));
1910                 } else {
1911                         start_time = timeval_current();
1912                         DEBUG(DEBUG_NOTICE, ("Attempting to take recovery lock (%s)\n",
1913                                              ctdb->recovery_lock_file));
1914                         if (!ctdb_recovery_lock(ctdb)) {
1915                                 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
1916                                         /* If ctdb is trying first recovery, it's
1917                                          * possible that current node does not know
1918                                          * yet who the recmaster is.
1919                                          */
1920                                         DEBUG(DEBUG_ERR, ("Unable to get recovery lock"
1921                                                           " - retrying recovery\n"));
1922                                         goto fail;
1923                                 }
1924
1925                                 DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1926                                                  "and ban ourself for %u seconds\n",
1927                                                  ctdb->tunable.recovery_ban_period));
1928                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1929                                 goto fail;
1930                         }
1931                         ctdb_ctrl_report_recd_lock_latency(ctdb,
1932                                                            CONTROL_TIMEOUT(),
1933                                                            timeval_elapsed(&start_time));
1934                         DEBUG(DEBUG_NOTICE,
1935                               ("Recovery lock taken successfully by recovery daemon\n"));
1936                 }
1937         }
1938
1939         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1940
1941         /* get a list of all databases */
1942         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1943         if (ret != 0) {
1944                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1945                 goto fail;
1946         }
1947
1948         /* we do the db creation before we set the recovery mode, so the freeze happens
1949            on all databases we will be dealing with. */
1950
1951         /* verify that we have all the databases any other node has */
1952         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1953         if (ret != 0) {
1954                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1955                 goto fail;
1956         }
1957
1958         /* verify that all other nodes have all our databases */
1959         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1960         if (ret != 0) {
1961                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1962                 goto fail;
1963         }
1964         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1965
1966         /* update the database priority for all remote databases */
1967         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1968         if (ret != 0) {
1969                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1970         }
1971         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1972
1973
1974         /* update all other nodes to use the same setting for reclock files
1975            as the local recovery master.
1976         */
1977         sync_recovery_lock_file_across_cluster(rec);
1978
1979         /* set recovery mode to active on all nodes */
1980         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1981         if (ret != 0) {
1982                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1983                 goto fail;
1984         }
1985
1986         /* execute the "startrecovery" event script on all nodes */
1987         ret = run_startrecovery_eventscript(rec, nodemap);
1988         if (ret!=0) {
1989                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1990                 goto fail;
1991         }
1992
1993         /*
1994           update all nodes to have the same flags that we have
1995          */
1996         for (i=0;i<nodemap->num;i++) {
1997                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1998                         continue;
1999                 }
2000
2001                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
2002                 if (ret != 0) {
2003                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2004                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
2005                         } else {
2006                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
2007                                 goto fail;
2008                         }
2009                 }
2010         }
2011
2012         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
2013
2014         /* pick a new generation number */
2015         generation = new_generation();
2016
2017         /* change the vnnmap on this node to use the new generation 
2018            number but not on any other nodes.
2019            this guarantees that if we abort the recovery prematurely
2020            for some reason (a node stops responding?)
2021            that we can just return immediately and we will reenter
2022            recovery shortly again.
2023            I.e. we deliberately leave the cluster with an inconsistent
2024            generation id to allow us to abort recovery at any stage and
2025            just restart it from scratch.
2026          */
2027         vnnmap->generation = generation;
2028         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
2029         if (ret != 0) {
2030                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
2031                 goto fail;
2032         }
2033
2034         data.dptr = (void *)&generation;
2035         data.dsize = sizeof(uint32_t);
2036
2037         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2038         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
2039                                         nodes, 0,
2040                                         CONTROL_TIMEOUT(), false, data,
2041                                         NULL,
2042                                         transaction_start_fail_callback,
2043                                         rec) != 0) {
2044                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
2045                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
2046                                         nodes, 0,
2047                                         CONTROL_TIMEOUT(), false, tdb_null,
2048                                         NULL,
2049                                         NULL,
2050                                         NULL) != 0) {
2051                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
2052                 }
2053                 goto fail;
2054         }
2055
2056         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
2057
2058         for (i=0;i<dbmap->num;i++) {
2059                 ret = recover_database(rec, mem_ctx,
2060                                        dbmap->dbs[i].dbid,
2061                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
2062                                        pnn, nodemap, generation);
2063                 if (ret != 0) {
2064                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
2065                         goto fail;
2066                 }
2067         }
2068
2069         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
2070
2071         /* commit all the changes */
2072         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
2073                                         nodes, 0,
2074                                         CONTROL_TIMEOUT(), false, data,
2075                                         NULL, NULL,
2076                                         NULL) != 0) {
2077                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
2078                 goto fail;
2079         }
2080
2081         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
2082         
2083
2084         /* update the capabilities for all nodes */
2085         ret = update_capabilities(rec, nodemap);
2086         if (ret!=0) {
2087                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
2088                 goto fail;
2089         }
2090
2091         /* build a new vnn map with all the currently active and
2092            unbanned nodes */
2093         generation = new_generation();
2094         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
2095         CTDB_NO_MEMORY(ctdb, vnnmap);
2096         vnnmap->generation = generation;
2097         vnnmap->size = 0;
2098         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
2099         CTDB_NO_MEMORY(ctdb, vnnmap->map);
2100         for (i=j=0;i<nodemap->num;i++) {
2101                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2102                         continue;
2103                 }
2104                 if (!ctdb_node_has_capabilities(rec->caps,
2105                                                 ctdb->nodes[i]->pnn,
2106                                                 CTDB_CAP_LMASTER)) {
2107                         /* this node can not be an lmaster */
2108                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
2109                         continue;
2110                 }
2111
2112                 vnnmap->size++;
2113                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
2114                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
2115                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
2116
2117         }
2118         if (vnnmap->size == 0) {
2119                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
2120                 vnnmap->size++;
2121                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
2122                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
2123                 vnnmap->map[0] = pnn;
2124         }       
2125
2126         /* update to the new vnnmap on all nodes */
2127         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
2128         if (ret != 0) {
2129                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
2130                 goto fail;
2131         }
2132
2133         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
2134
2135         /* update recmaster to point to us for all nodes */
2136         ret = set_recovery_master(ctdb, nodemap, pnn);
2137         if (ret!=0) {
2138                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
2139                 goto fail;
2140         }
2141
2142         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
2143
2144         /* disable recovery mode */
2145         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
2146         if (ret != 0) {
2147                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
2148                 goto fail;
2149         }
2150
2151         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
2152
2153         /* Fetch known/available public IPs from each active node */
2154         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
2155         if (ret != 0) {
2156                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2157                                  culprit));
2158                 rec->need_takeover_run = true;
2159                 goto fail;
2160         }
2161
2162         do_takeover_run(rec, nodemap, false);
2163
2164         /* execute the "recovered" event script on all nodes */
2165         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
2166         if (ret!=0) {
2167                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
2168                 goto fail;
2169         }
2170
2171         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
2172
2173         /* send a message to all clients telling them that the cluster 
2174            has been reconfigured */
2175         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2176                                        CTDB_SRVID_RECONFIGURE, tdb_null);
2177         if (ret != 0) {
2178                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
2179                 goto fail;
2180         }
2181
2182         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
2183
2184         rec->need_recovery = false;
2185         ctdb_op_end(rec->recovery);
2186
2187         /* we managed to complete a full recovery, make sure to forgive
2188            any past sins by the nodes that could now participate in the
2189            recovery.
2190         */
2191         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
2192         for (i=0;i<nodemap->num;i++) {
2193                 struct ctdb_banning_state *ban_state;
2194
2195                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2196                         continue;
2197                 }
2198
2199                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
2200                 if (ban_state == NULL) {
2201                         continue;
2202                 }
2203
2204                 ban_state->count = 0;
2205         }
2206
2207         /* We just finished a recovery successfully.
2208            We now wait for rerecovery_timeout before we allow
2209            another recovery to take place.
2210         */
2211         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
2212         ctdb_op_disable(rec->recovery, ctdb->ev,
2213                         ctdb->tunable.rerecovery_timeout);
2214         return 0;
2215
2216 fail:
2217         ctdb_op_end(rec->recovery);
2218         return -1;
2219 }
2220
2221
2222 /*
2223   elections are won by first checking the number of connected nodes, then
2224   the priority time, then the pnn
2225  */
2226 struct election_message {
2227         uint32_t num_connected;
2228         struct timeval priority_time;
2229         uint32_t pnn;
2230         uint32_t node_flags;
2231 };
2232
2233 /*
2234   form this nodes election data
2235  */
2236 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
2237 {
2238         int ret, i;
2239         struct ctdb_node_map *nodemap;
2240         struct ctdb_context *ctdb = rec->ctdb;
2241
2242         ZERO_STRUCTP(em);
2243
2244         em->pnn = rec->ctdb->pnn;
2245         em->priority_time = rec->priority_time;
2246
2247         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
2248         if (ret != 0) {
2249                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
2250                 return;
2251         }
2252
2253         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
2254         em->node_flags = rec->node_flags;
2255
2256         for (i=0;i<nodemap->num;i++) {
2257                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2258                         em->num_connected++;
2259                 }
2260         }
2261
2262         /* we shouldnt try to win this election if we cant be a recmaster */
2263         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2264                 em->num_connected = 0;
2265                 em->priority_time = timeval_current();
2266         }
2267
2268         talloc_free(nodemap);
2269 }
2270
2271 /*
2272   see if the given election data wins
2273  */
2274 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
2275 {
2276         struct election_message myem;
2277         int cmp = 0;
2278
2279         ctdb_election_data(rec, &myem);
2280
2281         /* we cant win if we dont have the recmaster capability */
2282         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2283                 return false;
2284         }
2285
2286         /* we cant win if we are banned */
2287         if (rec->node_flags & NODE_FLAGS_BANNED) {
2288                 return false;
2289         }
2290
2291         /* we cant win if we are stopped */
2292         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2293                 return false;
2294         }
2295
2296         /* we will automatically win if the other node is banned */
2297         if (em->node_flags & NODE_FLAGS_BANNED) {
2298                 return true;
2299         }
2300
2301         /* we will automatically win if the other node is banned */
2302         if (em->node_flags & NODE_FLAGS_STOPPED) {
2303                 return true;
2304         }
2305
2306         /* try to use the most connected node */
2307         if (cmp == 0) {
2308                 cmp = (int)myem.num_connected - (int)em->num_connected;
2309         }
2310
2311         /* then the longest running node */
2312         if (cmp == 0) {
2313                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2314         }
2315
2316         if (cmp == 0) {
2317                 cmp = (int)myem.pnn - (int)em->pnn;
2318         }
2319
2320         return cmp > 0;
2321 }
2322
2323 /*
2324   send out an election request
2325  */
2326 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
2327 {
2328         int ret;
2329         TDB_DATA election_data;
2330         struct election_message emsg;
2331         uint64_t srvid;
2332         struct ctdb_context *ctdb = rec->ctdb;
2333
2334         srvid = CTDB_SRVID_RECOVERY;
2335
2336         ctdb_election_data(rec, &emsg);
2337
2338         election_data.dsize = sizeof(struct election_message);
2339         election_data.dptr  = (unsigned char *)&emsg;
2340
2341
2342         /* first we assume we will win the election and set 
2343            recoverymaster to be ourself on the current node
2344          */
2345         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2346         if (ret != 0) {
2347                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2348                 return -1;
2349         }
2350
2351
2352         /* send an election message to all active nodes */
2353         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2354         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2355 }
2356
2357 /*
2358   this function will unban all nodes in the cluster
2359 */
2360 static void unban_all_nodes(struct ctdb_context *ctdb)
2361 {
2362         int ret, i;
2363         struct ctdb_node_map *nodemap;
2364         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2365         
2366         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2367         if (ret != 0) {
2368                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2369                 return;
2370         }
2371
2372         for (i=0;i<nodemap->num;i++) {
2373                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2374                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2375                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(),
2376                                                  nodemap->nodes[i].pnn, 0,
2377                                                  NODE_FLAGS_BANNED);
2378                         if (ret != 0) {
2379                                 DEBUG(DEBUG_ERR, (__location__ " failed to reset ban state\n"));
2380                         }
2381                 }
2382         }
2383
2384         talloc_free(tmp_ctx);
2385 }
2386
2387
2388 /*
2389   we think we are winning the election - send a broadcast election request
2390  */
2391 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2392 {
2393         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2394         int ret;
2395
2396         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
2397         if (ret != 0) {
2398                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2399         }
2400
2401         talloc_free(rec->send_election_te);
2402         rec->send_election_te = NULL;
2403 }
2404
2405 /*
2406   handler for memory dumps
2407 */
2408 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2409                              TDB_DATA data, void *private_data)
2410 {
2411         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2412         TDB_DATA *dump;
2413         int ret;
2414         struct srvid_request *rd;
2415
2416         if (data.dsize != sizeof(struct srvid_request)) {
2417                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2418                 talloc_free(tmp_ctx);
2419                 return;
2420         }
2421         rd = (struct srvid_request *)data.dptr;
2422
2423         dump = talloc_zero(tmp_ctx, TDB_DATA);
2424         if (dump == NULL) {
2425                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2426                 talloc_free(tmp_ctx);
2427                 return;
2428         }
2429         ret = ctdb_dump_memory(ctdb, dump);
2430         if (ret != 0) {
2431                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2432                 talloc_free(tmp_ctx);
2433                 return;
2434         }
2435
2436 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2437
2438         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2439         if (ret != 0) {
2440                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2441                 talloc_free(tmp_ctx);
2442                 return;
2443         }
2444
2445         talloc_free(tmp_ctx);
2446 }
2447
2448 /*
2449   handler for reload_nodes
2450 */
2451 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2452                              TDB_DATA data, void *private_data)
2453 {
2454         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2455
2456         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2457
2458         ctdb_load_nodes_file(rec->ctdb);
2459 }
2460
2461
2462 static void ctdb_rebalance_timeout(struct event_context *ev,
2463                                    struct timed_event *te,
2464                                    struct timeval t, void *p)
2465 {
2466         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2467
2468         if (rec->force_rebalance_nodes == NULL) {
2469                 DEBUG(DEBUG_ERR,
2470                       ("Rebalance timeout occurred - no nodes to rebalance\n"));
2471                 return;
2472         }
2473
2474         DEBUG(DEBUG_NOTICE,
2475               ("Rebalance timeout occurred - do takeover run\n"));
2476         do_takeover_run(rec, rec->nodemap, false);
2477 }
2478
2479         
2480 static void recd_node_rebalance_handler(struct ctdb_context *ctdb,
2481                                         uint64_t srvid,
2482                                         TDB_DATA data, void *private_data)
2483 {
2484         uint32_t pnn;
2485         uint32_t *t;
2486         int len;
2487         uint32_t deferred_rebalance;
2488         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2489
2490         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
2491                 return;
2492         }
2493
2494         if (data.dsize != sizeof(uint32_t)) {
2495                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2496                 return;
2497         }
2498
2499         pnn = *(uint32_t *)&data.dptr[0];
2500
2501         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
2502
2503         /* Copy any existing list of nodes.  There's probably some
2504          * sort of realloc variant that will do this but we need to
2505          * make sure that freeing the old array also cancels the timer
2506          * event for the timeout... not sure if realloc will do that.
2507          */
2508         len = (rec->force_rebalance_nodes != NULL) ?
2509                 talloc_array_length(rec->force_rebalance_nodes) :
2510                 0;
2511
2512         /* This allows duplicates to be added but they don't cause
2513          * harm.  A call to add a duplicate PNN arguably means that
2514          * the timeout should be reset, so this is the simplest
2515          * solution.
2516          */
2517         t = talloc_zero_array(rec, uint32_t, len+1);
2518         CTDB_NO_MEMORY_VOID(ctdb, t);
2519         if (len > 0) {
2520                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
2521         }
2522         t[len] = pnn;
2523
2524         talloc_free(rec->force_rebalance_nodes);
2525
2526         rec->force_rebalance_nodes = t;
2527
2528         /* If configured, setup a deferred takeover run to make sure
2529          * that certain nodes get IPs rebalanced to them.  This will
2530          * be cancelled if a successful takeover run happens before
2531          * the timeout.  Assign tunable value to variable for
2532          * readability.
2533          */
2534         deferred_rebalance = ctdb->tunable.deferred_rebalance_on_node_add;
2535         if (deferred_rebalance != 0) {
2536                 event_add_timed(ctdb->ev, rec->force_rebalance_nodes,
2537                                 timeval_current_ofs(deferred_rebalance, 0),
2538                                 ctdb_rebalance_timeout, rec);
2539         }
2540 }
2541
2542
2543
2544 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2545                              TDB_DATA data, void *private_data)
2546 {
2547         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2548         struct ctdb_public_ip *ip;
2549
2550         if (rec->recmaster != rec->ctdb->pnn) {
2551                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2552                 return;
2553         }
2554
2555         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2556                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2557                 return;
2558         }
2559
2560         ip = (struct ctdb_public_ip *)data.dptr;
2561
2562         update_ip_assignment_tree(rec->ctdb, ip);
2563 }
2564
2565 static void srvid_disable_and_reply(struct ctdb_context *ctdb,
2566                                     TDB_DATA data,
2567                                     struct ctdb_op_state *op_state)
2568 {
2569         struct srvid_request_data *r;
2570         uint32_t timeout;
2571         TDB_DATA result;
2572         int32_t ret = 0;
2573
2574         /* Validate input data */
2575         if (data.dsize != sizeof(struct srvid_request_data)) {
2576                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2577                                  "expecting %lu\n", (long unsigned)data.dsize,
2578                                  (long unsigned)sizeof(struct srvid_request)));
2579                 return;
2580         }
2581         if (data.dptr == NULL) {
2582                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2583                 return;
2584         }
2585
2586         r = (struct srvid_request_data *)data.dptr;
2587         timeout = r->data;
2588
2589         ret = ctdb_op_disable(op_state, ctdb->ev, timeout);
2590         if (ret != 0) {
2591                 goto done;
2592         }
2593
2594         /* Returning our PNN tells the caller that we succeeded */
2595         ret = ctdb_get_pnn(ctdb);
2596 done:
2597         result.dsize = sizeof(int32_t);
2598         result.dptr  = (uint8_t *)&ret;
2599         srvid_request_reply(ctdb, (struct srvid_request *)r, result);
2600 }
2601
2602 static void disable_takeover_runs_handler(struct ctdb_context *ctdb,
2603                                           uint64_t srvid, TDB_DATA data,
2604                                           void *private_data)
2605 {
2606         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2607                                                     struct ctdb_recoverd);
2608
2609         srvid_disable_and_reply(ctdb, data, rec->takeover_run);
2610 }
2611
2612 /* Backward compatibility for this SRVID */
2613 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid,
2614                                      TDB_DATA data, void *private_data)
2615 {
2616         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2617                                                     struct ctdb_recoverd);
2618         uint32_t timeout;
2619
2620         if (data.dsize != sizeof(uint32_t)) {
2621                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2622                                  "expecting %lu\n", (long unsigned)data.dsize,
2623                                  (long unsigned)sizeof(uint32_t)));
2624                 return;
2625         }
2626         if (data.dptr == NULL) {
2627                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2628                 return;
2629         }
2630
2631         timeout = *((uint32_t *)data.dptr);
2632
2633         ctdb_op_disable(rec->takeover_run, ctdb->ev, timeout);
2634 }
2635
2636 static void disable_recoveries_handler(struct ctdb_context *ctdb,
2637                                        uint64_t srvid, TDB_DATA data,
2638                                        void *private_data)
2639 {
2640         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2641                                                     struct ctdb_recoverd);
2642
2643         srvid_disable_and_reply(ctdb, data, rec->recovery);
2644 }
2645
2646 /*
2647   handler for ip reallocate, just add it to the list of requests and 
2648   handle this later in the monitor_cluster loop so we do not recurse
2649   with other requests to takeover_run()
2650 */
2651 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid,
2652                                   TDB_DATA data, void *private_data)
2653 {
2654         struct srvid_request *request;
2655         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2656                                                     struct ctdb_recoverd);
2657
2658         if (data.dsize != sizeof(struct srvid_request)) {
2659                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2660                 return;
2661         }
2662
2663         request = (struct srvid_request *)data.dptr;
2664
2665         srvid_request_add(ctdb, &rec->reallocate_requests, request);
2666 }
2667
2668 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
2669                                           struct ctdb_recoverd *rec)
2670 {
2671         TDB_DATA result;
2672         int32_t ret;
2673         uint32_t culprit;
2674         struct srvid_requests *current;
2675
2676         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2677
2678         /* Only process requests that are currently pending.  More
2679          * might come in while the takeover run is in progress and
2680          * they will need to be processed later since they might
2681          * be in response flag changes.
2682          */
2683         current = rec->reallocate_requests;
2684         rec->reallocate_requests = NULL;
2685
2686         /* update the list of public ips that a node can handle for
2687            all connected nodes
2688         */
2689         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2690         if (ret != 0) {
2691                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2692                                  culprit));
2693                 rec->need_takeover_run = true;
2694         }
2695         if (ret == 0) {
2696                 if (do_takeover_run(rec, rec->nodemap, false)) {
2697                         ret = ctdb_get_pnn(ctdb);
2698                 } else {
2699                         ret = -1;
2700                 }
2701         }
2702
2703         result.dsize = sizeof(int32_t);
2704         result.dptr  = (uint8_t *)&ret;
2705
2706         srvid_requests_reply(ctdb, &current, result);
2707 }
2708
2709
2710 /*
2711   handler for recovery master elections
2712 */
2713 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2714                              TDB_DATA data, void *private_data)
2715 {
2716         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2717         int ret;
2718         struct election_message *em = (struct election_message *)data.dptr;
2719         TALLOC_CTX *mem_ctx;
2720
2721         /* Ignore election packets from ourself */
2722         if (ctdb->pnn == em->pnn) {
2723                 return;
2724         }
2725
2726         /* we got an election packet - update the timeout for the election */
2727         talloc_free(rec->election_timeout);
2728         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2729                                                 fast_start ?
2730                                                 timeval_current_ofs(0, 500000) :
2731                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2732                                                 ctdb_election_timeout, rec);
2733
2734         mem_ctx = talloc_new(ctdb);
2735
2736         /* someone called an election. check their election data
2737            and if we disagree and we would rather be the elected node, 
2738            send a new election message to all other nodes
2739          */
2740         if (ctdb_election_win(rec, em)) {
2741                 if (!rec->send_election_te) {
2742                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2743                                                                 timeval_current_ofs(0, 500000),
2744                                                                 election_send_request, rec);
2745                 }
2746                 talloc_free(mem_ctx);
2747                 /*unban_all_nodes(ctdb);*/
2748                 return;
2749         }
2750
2751         /* we didn't win */
2752         talloc_free(rec->send_election_te);
2753         rec->send_election_te = NULL;
2754
2755         if (ctdb->recovery_lock_file != NULL) {
2756                 /* Release the recovery lock file */
2757                 if (em->pnn != ctdb->pnn &&
2758                     ctdb_recovery_have_lock(ctdb)) {
2759                         ctdb_recovery_unlock(ctdb);
2760                         unban_all_nodes(ctdb);
2761                 }
2762         }
2763
2764         /* ok, let that guy become recmaster then */
2765         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2766         if (ret != 0) {
2767                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2768                 talloc_free(mem_ctx);
2769                 return;
2770         }
2771
2772         talloc_free(mem_ctx);
2773         return;
2774 }
2775
2776
2777 /*
2778   force the start of the election process
2779  */
2780 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2781                            struct ctdb_node_map *nodemap)
2782 {
2783         int ret;
2784         struct ctdb_context *ctdb = rec->ctdb;
2785
2786         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2787
2788         /* set all nodes to recovery mode to stop all internode traffic */
2789         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2790         if (ret != 0) {
2791                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2792                 return;
2793         }
2794
2795         talloc_free(rec->election_timeout);
2796         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2797                                                 fast_start ?
2798                                                 timeval_current_ofs(0, 500000) :
2799                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2800                                                 ctdb_election_timeout, rec);
2801
2802         ret = send_election_request(rec, pnn);
2803         if (ret!=0) {
2804                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2805                 return;
2806         }
2807
2808         /* wait for a few seconds to collect all responses */
2809         ctdb_wait_election(rec);
2810 }
2811
2812
2813
2814 /*
2815   handler for when a node changes its flags
2816 */
2817 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2818                             TDB_DATA data, void *private_data)
2819 {
2820         int ret;
2821         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2822         struct ctdb_node_map *nodemap=NULL;
2823         TALLOC_CTX *tmp_ctx;
2824         int i;
2825         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2826         int disabled_flag_changed;
2827
2828         if (data.dsize != sizeof(*c)) {
2829                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2830                 return;
2831         }
2832
2833         tmp_ctx = talloc_new(ctdb);
2834         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2835
2836         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2837         if (ret != 0) {
2838                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2839                 talloc_free(tmp_ctx);
2840                 return;         
2841         }
2842
2843
2844         for (i=0;i<nodemap->num;i++) {
2845                 if (nodemap->nodes[i].pnn == c->pnn) break;
2846         }
2847
2848         if (i == nodemap->num) {
2849                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2850                 talloc_free(tmp_ctx);
2851                 return;
2852         }
2853
2854         if (c->old_flags != c->new_flags) {
2855                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2856         }
2857
2858         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2859
2860         nodemap->nodes[i].flags = c->new_flags;
2861
2862         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2863                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2864
2865         if (ret == 0) {
2866                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2867                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2868         }
2869         
2870         if (ret == 0 &&
2871             ctdb->recovery_master == ctdb->pnn &&
2872             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2873                 /* Only do the takeover run if the perm disabled or unhealthy
2874                    flags changed since these will cause an ip failover but not
2875                    a recovery.
2876                    If the node became disconnected or banned this will also
2877                    lead to an ip address failover but that is handled 
2878                    during recovery
2879                 */
2880                 if (disabled_flag_changed) {
2881                         rec->need_takeover_run = true;
2882                 }
2883         }
2884
2885         talloc_free(tmp_ctx);
2886 }
2887
2888 /*
2889   handler for when we need to push out flag changes ot all other nodes
2890 */
2891 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2892                             TDB_DATA data, void *private_data)
2893 {
2894         int ret;
2895         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2896         struct ctdb_node_map *nodemap=NULL;
2897         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2898         uint32_t recmaster;
2899         uint32_t *nodes;
2900
2901         /* find the recovery master */
2902         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2903         if (ret != 0) {
2904                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2905                 talloc_free(tmp_ctx);
2906                 return;
2907         }
2908
2909         /* read the node flags from the recmaster */
2910         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2911         if (ret != 0) {
2912                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2913                 talloc_free(tmp_ctx);
2914                 return;
2915         }
2916         if (c->pnn >= nodemap->num) {
2917                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2918                 talloc_free(tmp_ctx);
2919                 return;
2920         }
2921
2922         /* send the flags update to all connected nodes */
2923         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2924
2925         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2926                                       nodes, 0, CONTROL_TIMEOUT(),
2927                                       false, data,
2928                                       NULL, NULL,
2929                                       NULL) != 0) {
2930                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2931
2932                 talloc_free(tmp_ctx);
2933                 return;
2934         }
2935
2936         talloc_free(tmp_ctx);
2937 }
2938
2939
2940 struct verify_recmode_normal_data {
2941         uint32_t count;
2942         enum monitor_result status;
2943 };
2944
2945 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2946 {
2947         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2948
2949
2950         /* one more node has responded with recmode data*/
2951         rmdata->count--;
2952
2953         /* if we failed to get the recmode, then return an error and let
2954            the main loop try again.
2955         */
2956         if (state->state != CTDB_CONTROL_DONE) {
2957                 if (rmdata->status == MONITOR_OK) {
2958                         rmdata->status = MONITOR_FAILED;
2959                 }
2960                 return;
2961         }
2962
2963         /* if we got a response, then the recmode will be stored in the
2964            status field
2965         */
2966         if (state->status != CTDB_RECOVERY_NORMAL) {
2967                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2968                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2969         }
2970
2971         return;
2972 }
2973
2974
2975 /* verify that all nodes are in normal recovery mode */
2976 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2977 {
2978         struct verify_recmode_normal_data *rmdata;
2979         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2980         struct ctdb_client_control_state *state;
2981         enum monitor_result status;
2982         int j;
2983         
2984         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2985         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2986         rmdata->count  = 0;
2987         rmdata->status = MONITOR_OK;
2988
2989         /* loop over all active nodes and send an async getrecmode call to 
2990            them*/
2991         for (j=0; j<nodemap->num; j++) {
2992                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2993                         continue;
2994                 }
2995                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2996                                         CONTROL_TIMEOUT(), 
2997                                         nodemap->nodes[j].pnn);
2998                 if (state == NULL) {
2999                         /* we failed to send the control, treat this as 
3000                            an error and try again next iteration
3001                         */                      
3002                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
3003                         talloc_free(mem_ctx);
3004                         return MONITOR_FAILED;
3005                 }
3006
3007                 /* set up the callback functions */
3008                 state->async.fn = verify_recmode_normal_callback;
3009                 state->async.private_data = rmdata;
3010
3011                 /* one more control to wait for to complete */
3012                 rmdata->count++;
3013         }
3014
3015
3016         /* now wait for up to the maximum number of seconds allowed
3017            or until all nodes we expect a response from has replied
3018         */
3019         while (rmdata->count > 0) {
3020                 event_loop_once(ctdb->ev);
3021         }
3022
3023         status = rmdata->status;
3024         talloc_free(mem_ctx);
3025         return status;
3026 }
3027
3028
3029 struct verify_recmaster_data {
3030         struct ctdb_recoverd *rec;
3031         uint32_t count;
3032         uint32_t pnn;
3033         enum monitor_result status;
3034 };
3035
3036 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
3037 {
3038         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
3039
3040
3041         /* one more node has responded with recmaster data*/
3042         rmdata->count--;
3043
3044         /* if we failed to get the recmaster, then return an error and let
3045            the main loop try again.
3046         */
3047         if (state->state != CTDB_CONTROL_DONE) {
3048                 if (rmdata->status == MONITOR_OK) {
3049                         rmdata->status = MONITOR_FAILED;
3050                 }
3051                 return;
3052         }
3053
3054         /* if we got a response, then the recmaster will be stored in the
3055            status field
3056         */
3057         if (state->status != rmdata->pnn) {
3058                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
3059                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
3060                 rmdata->status = MONITOR_ELECTION_NEEDED;
3061         }
3062
3063         return;
3064 }
3065
3066
3067 /* verify that all nodes agree that we are the recmaster */
3068 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
3069 {
3070         struct ctdb_context *ctdb = rec->ctdb;
3071         struct verify_recmaster_data *rmdata;
3072         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3073         struct ctdb_client_control_state *state;
3074         enum monitor_result status;
3075         int j;
3076         
3077         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
3078         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3079         rmdata->rec    = rec;
3080         rmdata->count  = 0;
3081         rmdata->pnn    = pnn;
3082         rmdata->status = MONITOR_OK;
3083
3084         /* loop over all active nodes and send an async getrecmaster call to 
3085            them*/
3086         for (j=0; j<nodemap->num; j++) {
3087                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3088                         continue;
3089                 }
3090                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
3091                                         CONTROL_TIMEOUT(),
3092                                         nodemap->nodes[j].pnn);
3093                 if (state == NULL) {
3094                         /* we failed to send the control, treat this as 
3095                            an error and try again next iteration
3096                         */                      
3097                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
3098                         talloc_free(mem_ctx);
3099                         return MONITOR_FAILED;
3100                 }
3101
3102                 /* set up the callback functions */
3103                 state->async.fn = verify_recmaster_callback;
3104                 state->async.private_data = rmdata;
3105
3106                 /* one more control to wait for to complete */
3107                 rmdata->count++;
3108         }
3109
3110
3111         /* now wait for up to the maximum number of seconds allowed
3112            or until all nodes we expect a response from has replied
3113         */
3114         while (rmdata->count > 0) {
3115                 event_loop_once(ctdb->ev);
3116         }
3117
3118         status = rmdata->status;
3119         talloc_free(mem_ctx);
3120         return status;
3121 }
3122
3123 static bool interfaces_have_changed(struct ctdb_context *ctdb,
3124                                     struct ctdb_recoverd *rec)
3125 {
3126         struct ctdb_control_get_ifaces *ifaces = NULL;
3127         TALLOC_CTX *mem_ctx;
3128         bool ret = false;
3129
3130         mem_ctx = talloc_new(NULL);
3131
3132         /* Read the interfaces from the local node */
3133         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
3134                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
3135                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
3136                 /* We could return an error.  However, this will be
3137                  * rare so we'll decide that the interfaces have
3138                  * actually changed, just in case.
3139                  */
3140                 talloc_free(mem_ctx);
3141                 return true;
3142         }
3143
3144         if (!rec->ifaces) {
3145                 /* We haven't been here before so things have changed */
3146                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
3147                 ret = true;
3148         } else if (rec->ifaces->num != ifaces->num) {
3149                 /* Number of interfaces has changed */
3150                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
3151                                      rec->ifaces->num, ifaces->num));
3152                 ret = true;
3153         } else {
3154                 /* See if interface names or link states have changed */
3155                 int i;
3156                 for (i = 0; i < rec->ifaces->num; i++) {
3157                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
3158                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
3159                                 DEBUG(DEBUG_NOTICE,
3160                                       ("Interface in slot %d changed: %s => %s\n",
3161                                        i, iface->name, ifaces->ifaces[i].name));
3162                                 ret = true;
3163                                 break;
3164                         }
3165                         if (iface->link_state != ifaces->ifaces[i].link_state) {
3166                                 DEBUG(DEBUG_NOTICE,
3167                                       ("Interface %s changed state: %d => %d\n",
3168                                        iface->name, iface->link_state,
3169                                        ifaces->ifaces[i].link_state));
3170                                 ret = true;
3171                                 break;
3172                         }
3173                 }
3174         }
3175
3176         talloc_free(rec->ifaces);
3177         rec->ifaces = talloc_steal(rec, ifaces);
3178
3179         talloc_free(mem_ctx);
3180         return ret;
3181 }
3182
3183 /* called to check that the local allocation of public ip addresses is ok.
3184 */
3185 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
3186 {
3187         TALLOC_CTX *mem_ctx = talloc_new(NULL);
3188         struct ctdb_uptime *uptime1 = NULL;
3189         struct ctdb_uptime *uptime2 = NULL;
3190         int ret, j;
3191         bool need_takeover_run = false;
3192
3193         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3194                                 CTDB_CURRENT_NODE, &uptime1);
3195         if (ret != 0) {
3196                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3197                 talloc_free(mem_ctx);
3198                 return -1;
3199         }
3200
3201         if (interfaces_have_changed(ctdb, rec)) {
3202                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
3203                                      "local node %u - force takeover run\n",
3204                                      pnn));
3205                 need_takeover_run = true;
3206         }
3207
3208         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3209                                 CTDB_CURRENT_NODE, &uptime2);
3210         if (ret != 0) {
3211                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3212                 talloc_free(mem_ctx);
3213                 return -1;
3214         }
3215
3216         /* skip the check if the startrecovery time has changed */
3217         if (timeval_compare(&uptime1->last_recovery_started,
3218                             &uptime2->last_recovery_started) != 0) {
3219                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3220                 talloc_free(mem_ctx);
3221                 return 0;
3222         }
3223
3224         /* skip the check if the endrecovery time has changed */
3225         if (timeval_compare(&uptime1->last_recovery_finished,
3226                             &uptime2->last_recovery_finished) != 0) {
3227                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3228                 talloc_free(mem_ctx);
3229                 return 0;
3230         }
3231
3232         /* skip the check if we have started but not finished recovery */
3233         if (timeval_compare(&uptime1->last_recovery_finished,
3234                             &uptime1->last_recovery_started) != 1) {
3235                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
3236                 talloc_free(mem_ctx);
3237
3238                 return 0;
3239         }
3240
3241         /* verify that we have the ip addresses we should have
3242            and we dont have ones we shouldnt have.
3243            if we find an inconsistency we set recmode to
3244            active on the local node and wait for the recmaster
3245            to do a full blown recovery.
3246            also if the pnn is -1 and we are healthy and can host the ip
3247            we also request a ip reallocation.
3248         */
3249         if (ctdb->tunable.disable_ip_failover == 0) {
3250                 struct ctdb_all_public_ips *ips = NULL;
3251
3252                 /* read the *available* IPs from the local node */
3253                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3254                 if (ret != 0) {
3255                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3256                         talloc_free(mem_ctx);
3257                         return -1;
3258                 }
3259
3260                 for (j=0; j<ips->num; j++) {
3261                         if (ips->ips[j].pnn == -1 &&
3262                             nodemap->nodes[pnn].flags == 0) {
3263                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3264                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3265                                 need_takeover_run = true;
3266                         }
3267                 }
3268
3269                 talloc_free(ips);
3270
3271                 /* read the *known* IPs from the local node */
3272                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3273                 if (ret != 0) {
3274                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3275                         talloc_free(mem_ctx);
3276                         return -1;
3277                 }
3278
3279                 for (j=0; j<ips->num; j++) {
3280                         if (ips->ips[j].pnn == pnn) {
3281                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3282                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3283                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3284                                         need_takeover_run = true;
3285                                 }
3286                         } else {
3287                                 if (ctdb->do_checkpublicip &&
3288                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3289
3290                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3291                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3292
3293                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3294                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3295                                         }
3296                                 }
3297                         }
3298                 }
3299         }
3300
3301         if (need_takeover_run) {
3302                 struct srvid_request rd;
3303                 TDB_DATA data;
3304
3305                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3306
3307                 rd.pnn = ctdb->pnn;
3308                 rd.srvid = 0;
3309                 data.dptr = (uint8_t *)&rd;
3310                 data.dsize = sizeof(rd);
3311
3312                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3313                 if (ret != 0) {
3314                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3315                 }
3316         }
3317         talloc_free(mem_ctx);
3318         return 0;
3319 }
3320
3321
3322 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3323 {
3324         struct ctdb_node_map **remote_nodemaps = callback_data;
3325
3326         if (node_pnn >= ctdb->num_nodes) {
3327                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3328                 return;
3329         }
3330
3331         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
3332
3333 }
3334
3335 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3336         struct ctdb_node_map *nodemap,
3337         struct ctdb_node_map **remote_nodemaps)
3338 {
3339         uint32_t *nodes;
3340
3341         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3342         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3343                                         nodes, 0,
3344                                         CONTROL_TIMEOUT(), false, tdb_null,
3345                                         async_getnodemap_callback,
3346                                         NULL,
3347                                         remote_nodemaps) != 0) {
3348                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3349
3350                 return -1;
3351         }
3352
3353         return 0;
3354 }
3355
3356 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3357 {
3358         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3359         const char *reclockfile;
3360
3361         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3362                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3363                 talloc_free(tmp_ctx);
3364                 return -1;      
3365         }
3366
3367         if (reclockfile == NULL) {
3368                 if (ctdb->recovery_lock_file != NULL) {
3369                         DEBUG(DEBUG_NOTICE,("Recovery lock file disabled\n"));
3370                         talloc_free(ctdb->recovery_lock_file);
3371                         ctdb->recovery_lock_file = NULL;
3372                         ctdb_recovery_unlock(ctdb);
3373                 }
3374                 talloc_free(tmp_ctx);
3375                 return 0;
3376         }
3377
3378         if (ctdb->recovery_lock_file == NULL) {
3379                 DEBUG(DEBUG_NOTICE,
3380                       ("Recovery lock file enabled (%s)\n", reclockfile));
3381                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3382                 ctdb_recovery_unlock(ctdb);
3383                 talloc_free(tmp_ctx);
3384                 return 0;
3385         }
3386
3387
3388         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3389                 talloc_free(tmp_ctx);
3390                 return 0;
3391         }
3392
3393         DEBUG(DEBUG_NOTICE,
3394               ("Recovery lock file changed (now %s)\n", reclockfile));
3395         talloc_free(ctdb->recovery_lock_file);
3396         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3397         ctdb_recovery_unlock(ctdb);
3398
3399         talloc_free(tmp_ctx);
3400         return 0;
3401 }
3402
3403 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3404                       TALLOC_CTX *mem_ctx)
3405 {
3406         uint32_t pnn;
3407         struct ctdb_node_map *nodemap=NULL;
3408         struct ctdb_node_map *recmaster_nodemap=NULL;
3409         struct ctdb_node_map **remote_nodemaps=NULL;
3410         struct ctdb_vnn_map *vnnmap=NULL;
3411         struct ctdb_vnn_map *remote_vnnmap=NULL;
3412         int32_t debug_level;
3413         int i, j, ret;
3414         bool self_ban;
3415
3416
3417         /* verify that the main daemon is still running */
3418         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3419                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3420                 exit(-1);
3421         }
3422
3423         /* ping the local daemon to tell it we are alive */
3424         ctdb_ctrl_recd_ping(ctdb);
3425
3426         if (rec->election_timeout) {
3427                 /* an election is in progress */
3428                 return;
3429         }
3430
3431         /* read the debug level from the parent and update locally */
3432         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3433         if (ret !=0) {
3434                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3435                 return;
3436         }
3437         DEBUGLEVEL = debug_level;
3438
3439         /* get relevant tunables */
3440         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3441         if (ret != 0) {
3442                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3443                 return;
3444         }
3445
3446         /* get runstate */
3447         ret = ctdb_ctrl_get_runstate(ctdb, CONTROL_TIMEOUT(),
3448                                      CTDB_CURRENT_NODE, &ctdb->runstate);
3449         if (ret != 0) {
3450                 DEBUG(DEBUG_ERR, ("Failed to get runstate - retrying\n"));
3451                 return;
3452         }
3453
3454         /* get the current recovery lock file from the server */
3455         if (update_recovery_lock_file(ctdb) != 0) {
3456                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3457                 return;
3458         }
3459
3460         /* Make sure that if recovery lock verification becomes disabled when
3461            we close the file
3462         */
3463         if (ctdb->recovery_lock_file == NULL) {
3464                 ctdb_recovery_unlock(ctdb);
3465         }
3466
3467         pnn = ctdb_get_pnn(ctdb);
3468
3469         /* get the vnnmap */
3470         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3471         if (ret != 0) {
3472                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3473                 return;
3474         }
3475
3476
3477         /* get number of nodes */
3478         if (rec->nodemap) {
3479                 talloc_free(rec->nodemap);
3480                 rec->nodemap = NULL;
3481                 nodemap=NULL;
3482         }
3483         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3484         if (ret != 0) {
3485                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3486                 return;
3487         }
3488         nodemap = rec->nodemap;
3489
3490         /* remember our own node flags */
3491         rec->node_flags = nodemap->nodes[pnn].flags;
3492
3493         ban_misbehaving_nodes(rec, &self_ban);
3494         if (self_ban) {
3495                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3496                 return;
3497         }
3498
3499         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3500            also frozen and that the recmode is set to active.
3501         */
3502         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3503                 /* If this node has become inactive then we want to
3504                  * reduce the chances of it taking over the recovery
3505                  * master role when it becomes active again.  This
3506                  * helps to stabilise the recovery master role so that
3507                  * it stays on the most stable node.
3508                  */
3509                 rec->priority_time = timeval_current();
3510
3511                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3512                 if (ret != 0) {
3513                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3514                 }
3515                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3516                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3517
3518                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3519                         if (ret != 0) {
3520                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3521
3522                                 return;
3523                         }
3524                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3525                         if (ret != 0) {
3526                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3527                                 return;
3528                         }
3529                 }
3530
3531                 /* If this node is stopped or banned then it is not the recovery
3532                  * master, so don't do anything. This prevents stopped or banned
3533                  * node from starting election and sending unnecessary controls.
3534                  */
3535                 return;
3536         }
3537
3538         /* check which node is the recovery master */
3539         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3540         if (ret != 0) {
3541                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3542                 return;
3543         }
3544
3545         /* If we are not the recmaster then do some housekeeping */
3546         if (rec->recmaster != pnn) {
3547                 /* Ignore any IP reallocate requests - only recmaster
3548                  * processes them
3549                  */
3550                 TALLOC_FREE(rec->reallocate_requests);
3551                 /* Clear any nodes that should be force rebalanced in
3552                  * the next takeover run.  If the recovery master role
3553                  * has moved then we don't want to process these some
3554                  * time in the future.
3555                  */
3556                 TALLOC_FREE(rec->force_rebalance_nodes);
3557         }
3558
3559         /* This is a special case.  When recovery daemon is started, recmaster
3560          * is set to -1.  If a node is not started in stopped state, then
3561          * start election to decide recovery master
3562          */
3563         if (rec->recmaster == (uint32_t)-1) {
3564                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3565                 force_election(rec, pnn, nodemap);
3566                 return;
3567         }
3568
3569         /* update the capabilities for all nodes */
3570         ret = update_capabilities(rec, nodemap);
3571         if (ret != 0) {
3572                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3573                 return;
3574         }
3575
3576         /*
3577          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3578          * but we have, then force an election and try to become the new
3579          * recmaster.
3580          */
3581         if (!ctdb_node_has_capabilities(rec->caps,
3582                                         rec->recmaster,
3583                                         CTDB_CAP_RECMASTER) &&
3584             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3585             !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3586                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3587                                   " but we (node %u) have - force an election\n",
3588                                   rec->recmaster, pnn));
3589                 force_election(rec, pnn, nodemap);
3590                 return;
3591         }
3592
3593         /* count how many active nodes there are */
3594         rec->num_active    = 0;
3595         rec->num_lmasters  = 0;
3596         rec->num_connected = 0;
3597         for (i=0; i<nodemap->num; i++) {
3598                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3599                         rec->num_active++;
3600                         if (ctdb_node_has_capabilities(rec->caps,
3601                                                        ctdb->nodes[i]->pnn,
3602                                                        CTDB_CAP_LMASTER)) {
3603                                 rec->num_lmasters++;
3604                         }
3605                 }
3606                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3607                         rec->num_connected++;
3608                 }
3609         }
3610
3611
3612         /* verify that the recmaster node is still active */
3613         for (j=0; j<nodemap->num; j++) {
3614                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3615                         break;
3616                 }
3617         }
3618
3619         if (j == nodemap->num) {
3620                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3621                 force_election(rec, pnn, nodemap);
3622                 return;
3623         }
3624
3625         /* if recovery master is disconnected we must elect a new recmaster */
3626         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3627                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3628                 force_election(rec, pnn, nodemap);
3629                 return;
3630         }
3631
3632         /* get nodemap from the recovery master to check if it is inactive */
3633         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3634                                    mem_ctx, &recmaster_nodemap);
3635         if (ret != 0) {
3636                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3637                           nodemap->nodes[j].pnn));
3638                 return;
3639         }
3640
3641
3642         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3643             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3644                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3645                 /*
3646                  * update our nodemap to carry the recmaster's notion of
3647                  * its own flags, so that we don't keep freezing the
3648                  * inactive recmaster node...
3649                  */
3650                 nodemap->nodes[j].flags = recmaster_nodemap->nodes[j].flags;
3651                 force_election(rec, pnn, nodemap);
3652                 return;
3653         }
3654
3655         /* verify that we have all ip addresses we should have and we dont
3656          * have addresses we shouldnt have.
3657          */ 
3658         if (ctdb->tunable.disable_ip_failover == 0 &&
3659             !ctdb_op_is_disabled(rec->takeover_run)) {
3660                 if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3661                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3662                 }
3663         }
3664
3665
3666         /* if we are not the recmaster then we do not need to check
3667            if recovery is needed
3668          */
3669         if (pnn != rec->recmaster) {
3670                 return;
3671         }
3672
3673
3674         /* ensure our local copies of flags are right */
3675         ret = update_local_flags(rec, nodemap);
3676         if (ret == MONITOR_ELECTION_NEEDED) {
3677                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3678                 force_election(rec, pnn, nodemap);
3679                 return;
3680         }
3681         if (ret != MONITOR_OK) {
3682                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3683                 return;
3684         }
3685
3686         if (ctdb->num_nodes != nodemap->num) {
3687                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3688                 ctdb_load_nodes_file(ctdb);
3689                 return;
3690         }
3691
3692         /* verify that all active nodes agree that we are the recmaster */
3693         switch (verify_recmaster(rec, nodemap, pnn)) {
3694         case MONITOR_RECOVERY_NEEDED:
3695                 /* can not happen */
3696                 return;
3697         case MONITOR_ELECTION_NEEDED:
3698                 force_election(rec, pnn, nodemap);
3699                 return;
3700         case MONITOR_OK:
3701                 break;
3702         case MONITOR_FAILED:
3703                 return;
3704         }
3705
3706
3707         if (rec->need_recovery) {
3708                 /* a previous recovery didn't finish */
3709                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3710                 return;
3711         }
3712
3713         /* verify that all active nodes are in normal mode 
3714            and not in recovery mode 
3715         */
3716         switch (verify_recmode(ctdb, nodemap)) {
3717         case MONITOR_RECOVERY_NEEDED:
3718                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3719                 return;
3720         case MONITOR_FAILED:
3721                 return;
3722         case MONITOR_ELECTION_NEEDED:
3723                 /* can not happen */
3724         case MONITOR_OK:
3725                 break;
3726         }
3727
3728
3729         if (ctdb->recovery_lock_file != NULL) {
3730                 /* We must already hold the recovery lock */
3731                 if (!ctdb_recovery_have_lock(ctdb)) {
3732                         DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
3733                         ctdb_set_culprit(rec, ctdb->pnn);
3734                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3735                         return;
3736                 }
3737         }
3738
3739
3740         /* if there are takeovers requested, perform it and notify the waiters */
3741         if (!ctdb_op_is_disabled(rec->takeover_run) &&
3742             rec->reallocate_requests) {
3743                 process_ipreallocate_requests(ctdb, rec);
3744         }
3745
3746         /* If recoveries are disabled then there is no use doing any
3747          * nodemap or flags checks.  Recoveries might be disabled due
3748          * to "reloadnodes", so doing these checks might cause an
3749          * unnecessary recovery.  */
3750         if (ctdb_op_is_disabled(rec->recovery)) {
3751                 return;
3752         }
3753
3754         /* get the nodemap for all active remote nodes
3755          */
3756         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3757         if (remote_nodemaps == NULL) {
3758                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3759                 return;
3760         }
3761         for(i=0; i<nodemap->num; i++) {
3762                 remote_nodemaps[i] = NULL;
3763         }
3764         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3765                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3766                 return;
3767         } 
3768
3769         /* verify that all other nodes have the same nodemap as we have
3770         */
3771         for (j=0; j<nodemap->num; j++) {
3772                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3773                         continue;
3774                 }
3775
3776                 if (remote_nodemaps[j] == NULL) {
3777                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3778                         ctdb_set_culprit(rec, j);
3779
3780                         return;
3781                 }
3782
3783                 /* if the nodes disagree on how many nodes there are
3784                    then this is a good reason to try recovery
3785                  */
3786                 if (remote_nodemaps[j]->num != nodemap->num) {
3787                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3788                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3789                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3790                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3791                         return;
3792                 }
3793
3794                 /* if the nodes disagree on which nodes exist and are
3795                    active, then that is also a good reason to do recovery
3796                  */
3797                 for (i=0;i<nodemap->num;i++) {
3798                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3799                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3800                                           nodemap->nodes[j].pnn, i, 
3801                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3802                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3803                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3804                                             vnnmap);
3805                                 return;
3806                         }
3807                 }
3808         }
3809
3810         /*
3811          * Update node flags obtained from each active node. This ensure we have
3812          * up-to-date information for all the nodes.
3813          */
3814         for (j=0; j<nodemap->num; j++) {
3815                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3816                         continue;
3817                 }
3818                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3819         }
3820
3821         for (j=0; j<nodemap->num; j++) {
3822                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3823                         continue;
3824                 }
3825
3826                 /* verify the flags are consistent
3827                 */
3828                 for (i=0; i<nodemap->num; i++) {
3829                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3830                                 continue;
3831                         }
3832                         
3833                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3834                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3835                                   nodemap->nodes[j].pnn, 
3836                                   nodemap->nodes[i].pnn, 
3837                                   remote_nodemaps[j]->nodes[i].flags,
3838                                   nodemap->nodes[i].flags));
3839                                 if (i == j) {
3840                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3841                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3842                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3843                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3844                                                     vnnmap);
3845                                         return;
3846                                 } else {
3847                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3848                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3849                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3850                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3851                                                     vnnmap);
3852                                         return;
3853                                 }
3854                         }
3855                 }
3856         }
3857
3858
3859         /* There must be the same number of lmasters in the vnn map as
3860          * there are active nodes with the lmaster capability...  or
3861          * do a recovery.
3862          */
3863         if (vnnmap->size != rec->num_lmasters) {
3864                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
3865                           vnnmap->size, rec->num_lmasters));
3866                 ctdb_set_culprit(rec, ctdb->pnn);
3867                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3868                 return;
3869         }
3870
3871         /* verify that all active nodes in the nodemap also exist in 
3872            the vnnmap.
3873          */
3874         for (j=0; j<nodemap->num; j++) {
3875                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3876                         continue;
3877                 }
3878                 if (nodemap->nodes[j].pnn == pnn) {
3879                         continue;
3880                 }
3881
3882                 for (i=0; i<vnnmap->size; i++) {
3883                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3884                                 break;
3885                         }
3886                 }
3887                 if (i == vnnmap->size) {
3888                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3889                                   nodemap->nodes[j].pnn));
3890                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3891                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3892                         return;
3893                 }
3894         }
3895
3896         
3897         /* verify that all other nodes have the same vnnmap
3898            and are from the same generation
3899          */
3900         for (j=0; j<nodemap->num; j++) {
3901                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3902                         continue;
3903                 }
3904                 if (nodemap->nodes[j].pnn == pnn) {
3905                         continue;
3906                 }
3907
3908                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3909                                           mem_ctx, &remote_vnnmap);
3910                 if (ret != 0) {
3911                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3912                                   nodemap->nodes[j].pnn));
3913                         return;
3914                 }
3915
3916                 /* verify the vnnmap generation is the same */
3917                 if (vnnmap->generation != remote_vnnmap->generation) {
3918                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3919                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3920                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3921                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3922                         return;
3923                 }
3924
3925                 /* verify the vnnmap size is the same */
3926                 if (vnnmap->size != remote_vnnmap->size) {
3927                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3928                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3929                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3930                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3931                         return;
3932                 }
3933
3934                 /* verify the vnnmap is the same */
3935                 for (i=0;i<vnnmap->size;i++) {
3936                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3937                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3938                                           nodemap->nodes[j].pnn));
3939                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3940                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3941                                             vnnmap);
3942                                 return;
3943                         }
3944                 }
3945         }
3946
3947         /* we might need to change who has what IP assigned */
3948         if (rec->need_takeover_run) {
3949                 uint32_t culprit = (uint32_t)-1;
3950
3951                 rec->need_takeover_run = false;
3952
3953                 /* update the list of public ips that a node can handle for
3954                    all connected nodes
3955                 */
3956                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3957                 if (ret != 0) {
3958                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3959                                          culprit));
3960                         rec->need_takeover_run = true;
3961                         return;
3962                 }
3963
3964                 /* execute the "startrecovery" event script on all nodes */
3965                 ret = run_startrecovery_eventscript(rec, nodemap);
3966                 if (ret!=0) {
3967                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3968                         ctdb_set_culprit(rec, ctdb->pnn);
3969                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3970                         return;
3971                 }
3972
3973                 /* If takeover run fails, then the offending nodes are
3974                  * assigned ban culprit counts. And we re-try takeover.
3975                  * If takeover run fails repeatedly, the node would get
3976                  * banned.
3977                  *
3978                  * If rec->need_takeover_run is not set to true at this
3979                  * failure, monitoring is disabled cluster-wide (via
3980                  * startrecovery eventscript) and will not get enabled.
3981                  */
3982                 if (!do_takeover_run(rec, nodemap, true)) {
3983                         return;
3984                 }
3985
3986                 /* execute the "recovered" event script on all nodes */
3987                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
3988 #if 0
3989 // we cant check whether the event completed successfully
3990 // since this script WILL fail if the node is in recovery mode
3991 // and if that race happens, the code here would just cause a second
3992 // cascading recovery.
3993                 if (ret!=0) {
3994                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3995                         ctdb_set_culprit(rec, ctdb->pnn);
3996                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3997                 }
3998 #endif
3999         }
4000 }
4001
4002 /*
4003   the main monitoring loop
4004  */
4005 static void monitor_cluster(struct ctdb_context *ctdb)
4006 {
4007         struct ctdb_recoverd *rec;
4008
4009         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
4010
4011         rec = talloc_zero(ctdb, struct ctdb_recoverd);
4012         CTDB_NO_MEMORY_FATAL(ctdb, rec);
4013
4014         rec->ctdb = ctdb;
4015
4016         rec->takeover_run = ctdb_op_init(rec, "takeover runs");
4017         CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
4018
4019         rec->recovery = ctdb_op_init(rec, "recoveries");
4020         CTDB_NO_MEMORY_FATAL(ctdb, rec->recovery);
4021
4022         rec->priority_time = timeval_current();
4023
4024         /* register a message port for sending memory dumps */
4025         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
4026
4027         /* register a message port for recovery elections */
4028         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
4029
4030         /* when nodes are disabled/enabled */
4031         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
4032
4033         /* when we are asked to puch out a flag change */
4034         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
4035
4036         /* register a message port for vacuum fetch */
4037         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
4038
4039         /* register a message port for reloadnodes  */
4040         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
4041
4042         /* register a message port for performing a takeover run */
4043         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
4044
4045         /* register a message port for disabling the ip check for a short while */
4046         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
4047
4048         /* register a message port for updating the recovery daemons node assignment for an ip */
4049         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
4050
4051         /* register a message port for forcing a rebalance of a node next
4052            reallocation */
4053         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
4054
4055         /* Register a message port for disabling takeover runs */
4056         ctdb_client_set_message_handler(ctdb,
4057                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
4058                                         disable_takeover_runs_handler, rec);
4059
4060         /* Register a message port for disabling recoveries */
4061         ctdb_client_set_message_handler(ctdb,
4062                                         CTDB_SRVID_DISABLE_RECOVERIES,
4063                                         disable_recoveries_handler, rec);
4064
4065         /* register a message port for detaching database */
4066         ctdb_client_set_message_handler(ctdb,
4067                                         CTDB_SRVID_DETACH_DATABASE,
4068                                         detach_database_handler, rec);
4069
4070         for (;;) {
4071                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
4072                 struct timeval start;
4073                 double elapsed;
4074
4075                 if (!mem_ctx) {
4076                         DEBUG(DEBUG_CRIT,(__location__
4077                                           " Failed to create temp context\n"));
4078                         exit(-1);
4079                 }
4080
4081                 start = timeval_current();
4082                 main_loop(ctdb, rec, mem_ctx);
4083                 talloc_free(mem_ctx);
4084
4085                 /* we only check for recovery once every second */
4086                 elapsed = timeval_elapsed(&start);
4087                 if (elapsed < ctdb->tunable.recover_interval) {
4088                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
4089                                           - elapsed);
4090                 }
4091         }
4092 }
4093
4094 /*
4095   event handler for when the main ctdbd dies
4096  */
4097 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
4098                                  uint16_t flags, void *private_data)
4099 {
4100         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
4101         _exit(1);
4102 }
4103
4104 /*
4105   called regularly to verify that the recovery daemon is still running
4106  */
4107 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
4108                               struct timeval yt, void *p)
4109 {
4110         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
4111
4112         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
4113                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
4114
4115                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
4116                                 ctdb_restart_recd, ctdb);
4117
4118                 return;
4119         }
4120
4121         event_add_timed(ctdb->ev, ctdb->recd_ctx,
4122                         timeval_current_ofs(30, 0),
4123                         ctdb_check_recd, ctdb);
4124 }
4125
4126 static void recd_sig_child_handler(struct event_context *ev,
4127         struct signal_event *se, int signum, int count,
4128         void *dont_care, 
4129         void *private_data)
4130 {
4131 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4132         int status;
4133         pid_t pid = -1;
4134
4135         while (pid != 0) {
4136                 pid = waitpid(-1, &status, WNOHANG);
4137                 if (pid == -1) {
4138                         if (errno != ECHILD) {
4139                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4140                         }
4141                         return;
4142                 }
4143                 if (pid > 0) {
4144                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4145                 }
4146         }
4147 }
4148
4149 /*
4150   startup the recovery daemon as a child of the main ctdb daemon
4151  */
4152 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4153 {
4154         int fd[2];
4155         struct signal_event *se;
4156         struct tevent_fd *fde;
4157
4158         if (pipe(fd) != 0) {
4159                 return -1;
4160         }
4161
4162         ctdb->recoverd_pid = ctdb_fork(ctdb);
4163         if (ctdb->recoverd_pid == -1) {
4164                 return -1;
4165         }
4166
4167         if (ctdb->recoverd_pid != 0) {
4168                 talloc_free(ctdb->recd_ctx);
4169                 ctdb->recd_ctx = talloc_new(ctdb);
4170                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4171
4172                 close(fd[0]);
4173                 event_add_timed(ctdb->ev, ctdb->recd_ctx,
4174                                 timeval_current_ofs(30, 0),
4175                                 ctdb_check_recd, ctdb);
4176                 return 0;
4177         }
4178
4179         close(fd[1]);
4180
4181         srandom(getpid() ^ time(NULL));
4182
4183         ctdb_set_process_name("ctdb_recovered");
4184         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4185                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4186                 exit(1);
4187         }
4188
4189         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4190
4191         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
4192                      ctdb_recoverd_parent, &fd[0]);
4193         tevent_fd_set_auto_close(fde);
4194
4195         /* set up a handler to pick up sigchld */
4196         se = event_add_signal(ctdb->ev, ctdb,
4197                                      SIGCHLD, 0,
4198                                      recd_sig_child_handler,
4199                                      ctdb);
4200         if (se == NULL) {
4201                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4202                 exit(1);
4203         }
4204
4205         monitor_cluster(ctdb);
4206
4207         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4208         return -1;
4209 }
4210
4211 /*
4212   shutdown the recovery daemon
4213  */
4214 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4215 {
4216         if (ctdb->recoverd_pid == 0) {
4217                 return;
4218         }
4219
4220         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4221         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4222
4223         TALLOC_FREE(ctdb->recd_ctx);
4224         TALLOC_FREE(ctdb->recd_ping_count);
4225 }
4226
4227 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
4228                        struct timeval t, void *private_data)
4229 {
4230         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4231
4232         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4233         ctdb_stop_recoverd(ctdb);
4234         ctdb_start_recoverd(ctdb);
4235 }