dont leak file descriptors
[metze/ctdb/wip.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 struct ban_state {
35         struct ctdb_recoverd *rec;
36         uint32_t banned_node;
37 };
38
39 /*
40   private state of recovery daemon
41  */
42 struct ctdb_recoverd {
43         struct ctdb_context *ctdb;
44         uint32_t recmaster;
45         uint32_t num_active;
46         uint32_t num_connected;
47         struct ctdb_node_map *nodemap;
48         uint32_t last_culprit;
49         uint32_t culprit_counter;
50         struct timeval first_recover_time;
51         struct ban_state **banned_nodes;
52         struct timeval priority_time;
53         bool need_takeover_run;
54         bool need_recovery;
55         uint32_t node_flags;
56         struct timed_event *send_election_te;
57         struct timed_event *election_timeout;
58         struct vacuum_info *vacuum_info;
59 };
60
61 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
62 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
63
64
65 /*
66   unban a node
67  */
68 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
69 {
70         struct ctdb_context *ctdb = rec->ctdb;
71
72         DEBUG(DEBUG_NOTICE,("Unbanning node %u\n", pnn));
73
74         if (!ctdb_validate_pnn(ctdb, pnn)) {
75                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_unban_node\n", pnn));
76                 return;
77         }
78
79         /* If we are unbanning a different node then just pass the ban info on */
80         if (pnn != ctdb->pnn) {
81                 TDB_DATA data;
82                 int ret;
83                 
84                 DEBUG(DEBUG_NOTICE,("Unanning remote node %u. Passing the ban request on to the remote node.\n", pnn));
85
86                 data.dptr = (uint8_t *)&pnn;
87                 data.dsize = sizeof(uint32_t);
88
89                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_UNBAN_NODE, data);
90                 if (ret != 0) {
91                         DEBUG(DEBUG_ERR,("Failed to unban node %u\n", pnn));
92                         return;
93                 }
94
95                 return;
96         }
97
98         /* make sure we remember we are no longer banned in case 
99            there is an election */
100         rec->node_flags &= ~NODE_FLAGS_BANNED;
101
102         DEBUG(DEBUG_INFO,("Clearing ban flag on node %u\n", pnn));
103         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
104
105         if (rec->banned_nodes[pnn] == NULL) {
106                 DEBUG(DEBUG_INFO,("No ban recorded for this node. ctdb_unban_node() request ignored\n"));
107                 return;
108         }
109
110         talloc_free(rec->banned_nodes[pnn]);
111         rec->banned_nodes[pnn] = NULL;
112 }
113
114
115 /*
116   called when a ban has timed out
117  */
118 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
119 {
120         struct ban_state *state = talloc_get_type(p, struct ban_state);
121         struct ctdb_recoverd *rec = state->rec;
122         uint32_t pnn = state->banned_node;
123
124         DEBUG(DEBUG_NOTICE,("Ban timeout. Node %u is now unbanned\n", pnn));
125         ctdb_unban_node(rec, pnn);
126 }
127
128 /*
129   ban a node for a period of time
130  */
131 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
132 {
133         struct ctdb_context *ctdb = rec->ctdb;
134
135         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
136
137         if (!ctdb_validate_pnn(ctdb, pnn)) {
138                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
139                 return;
140         }
141
142         if (0 == ctdb->tunable.enable_bans) {
143                 DEBUG(DEBUG_INFO,("Bans are disabled - ignoring ban of node %u\n", pnn));
144                 return;
145         }
146
147         /* If we are banning a different node then just pass the ban info on */
148         if (pnn != ctdb->pnn) {
149                 struct ctdb_ban_info b;
150                 TDB_DATA data;
151                 int ret;
152                 
153                 DEBUG(DEBUG_NOTICE,("Banning remote node %u for %u seconds. Passing the ban request on to the remote node.\n", pnn, ban_time));
154
155                 b.pnn = pnn;
156                 b.ban_time = ban_time;
157
158                 data.dptr = (uint8_t *)&b;
159                 data.dsize = sizeof(b);
160
161                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_BAN_NODE, data);
162                 if (ret != 0) {
163                         DEBUG(DEBUG_ERR,("Failed to ban node %u\n", pnn));
164                         return;
165                 }
166
167                 return;
168         }
169
170         DEBUG(DEBUG_NOTICE,("self ban - lowering our election priority\n"));
171         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
172
173         /* banning ourselves - lower our election priority */
174         rec->priority_time = timeval_current();
175
176         /* make sure we remember we are banned in case there is an 
177            election */
178         rec->node_flags |= NODE_FLAGS_BANNED;
179
180         if (rec->banned_nodes[pnn] != NULL) {
181                 DEBUG(DEBUG_NOTICE,("Re-banning an already banned node. Remove previous ban and set a new ban.\n"));            
182                 talloc_free(rec->banned_nodes[pnn]);
183                 rec->banned_nodes[pnn] = NULL;
184         }
185
186         rec->banned_nodes[pnn] = talloc(rec->banned_nodes, struct ban_state);
187         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
188
189         rec->banned_nodes[pnn]->rec = rec;
190         rec->banned_nodes[pnn]->banned_node = pnn;
191
192         if (ban_time != 0) {
193                 event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
194                                 timeval_current_ofs(ban_time, 0),
195                                 ctdb_ban_timeout, rec->banned_nodes[pnn]);
196         }
197 }
198
199 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
200
201
202 /*
203   run the "recovered" eventscript on all nodes
204  */
205 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
206 {
207         TALLOC_CTX *tmp_ctx;
208         uint32_t *nodes;
209
210         tmp_ctx = talloc_new(ctdb);
211         CTDB_NO_MEMORY(ctdb, tmp_ctx);
212
213         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
214         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
215                                         nodes,
216                                         CONTROL_TIMEOUT(), false, tdb_null,
217                                         NULL, NULL,
218                                         NULL) != 0) {
219                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
220
221                 talloc_free(tmp_ctx);
222                 return -1;
223         }
224
225         talloc_free(tmp_ctx);
226         return 0;
227 }
228
229 /*
230   remember the trouble maker
231  */
232 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
233 {
234         struct ctdb_context *ctdb = rec->ctdb;
235
236         if (rec->last_culprit != culprit ||
237             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
238                 DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
239                 /* either a new node is the culprit, or we've decided to forgive them */
240                 rec->last_culprit = culprit;
241                 rec->first_recover_time = timeval_current();
242                 rec->culprit_counter = 0;
243         }
244         rec->culprit_counter++;
245 }
246
247 /*
248   remember the trouble maker
249  */
250 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
251 {
252         struct ctdb_context *ctdb = rec->ctdb;
253
254         if (rec->last_culprit != culprit ||
255             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
256                 DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
257                 /* either a new node is the culprit, or we've decided to forgive them */
258                 rec->last_culprit = culprit;
259                 rec->first_recover_time = timeval_current();
260                 rec->culprit_counter = 0;
261         }
262         rec->culprit_counter += count;
263 }
264
265 /* this callback is called for every node that failed to execute the
266    start recovery event
267 */
268 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
269 {
270         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
271
272         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
273
274         ctdb_set_culprit(rec, node_pnn);
275 }
276
277 /*
278   run the "startrecovery" eventscript on all nodes
279  */
280 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
281 {
282         TALLOC_CTX *tmp_ctx;
283         uint32_t *nodes;
284         struct ctdb_context *ctdb = rec->ctdb;
285
286         tmp_ctx = talloc_new(ctdb);
287         CTDB_NO_MEMORY(ctdb, tmp_ctx);
288
289         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
290         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
291                                         nodes,
292                                         CONTROL_TIMEOUT(), false, tdb_null,
293                                         NULL,
294                                         startrecovery_fail_callback,
295                                         rec) != 0) {
296                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
297                 talloc_free(tmp_ctx);
298                 return -1;
299         }
300
301         talloc_free(tmp_ctx);
302         return 0;
303 }
304
305 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
306 {
307         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
308                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
309                 return;
310         }
311         if (node_pnn < ctdb->num_nodes) {
312                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
313         }
314 }
315
316 /*
317   update the node capabilities for all connected nodes
318  */
319 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
320 {
321         uint32_t *nodes;
322         TALLOC_CTX *tmp_ctx;
323
324         tmp_ctx = talloc_new(ctdb);
325         CTDB_NO_MEMORY(ctdb, tmp_ctx);
326
327         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
328         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
329                                         nodes, CONTROL_TIMEOUT(),
330                                         false, tdb_null,
331                                         async_getcap_callback, NULL,
332                                         NULL) != 0) {
333                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
334                 talloc_free(tmp_ctx);
335                 return -1;
336         }
337
338         talloc_free(tmp_ctx);
339         return 0;
340 }
341
342 /*
343   change recovery mode on all nodes
344  */
345 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
346 {
347         TDB_DATA data;
348         uint32_t *nodes;
349         TALLOC_CTX *tmp_ctx;
350
351         tmp_ctx = talloc_new(ctdb);
352         CTDB_NO_MEMORY(ctdb, tmp_ctx);
353
354         /* freeze all nodes */
355         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
356         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
357                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
358                                                 nodes, CONTROL_TIMEOUT(),
359                                                 false, tdb_null,
360                                                 NULL, NULL,
361                                                 NULL) != 0) {
362                         DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
363                         talloc_free(tmp_ctx);
364                         return -1;
365                 }
366         }
367
368
369         data.dsize = sizeof(uint32_t);
370         data.dptr = (unsigned char *)&rec_mode;
371
372         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
373                                         nodes, CONTROL_TIMEOUT(),
374                                         false, data,
375                                         NULL, NULL,
376                                         NULL) != 0) {
377                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
378                 talloc_free(tmp_ctx);
379                 return -1;
380         }
381
382         talloc_free(tmp_ctx);
383         return 0;
384 }
385
386 /*
387   change recovery master on all node
388  */
389 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
390 {
391         TDB_DATA data;
392         TALLOC_CTX *tmp_ctx;
393         uint32_t *nodes;
394
395         tmp_ctx = talloc_new(ctdb);
396         CTDB_NO_MEMORY(ctdb, tmp_ctx);
397
398         data.dsize = sizeof(uint32_t);
399         data.dptr = (unsigned char *)&pnn;
400
401         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
402         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
403                                         nodes,
404                                         CONTROL_TIMEOUT(), false, data,
405                                         NULL, NULL,
406                                         NULL) != 0) {
407                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
408                 talloc_free(tmp_ctx);
409                 return -1;
410         }
411
412         talloc_free(tmp_ctx);
413         return 0;
414 }
415
416
417 /*
418   ensure all other nodes have attached to any databases that we have
419  */
420 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
421                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
422 {
423         int i, j, db, ret;
424         struct ctdb_dbid_map *remote_dbmap;
425
426         /* verify that all other nodes have all our databases */
427         for (j=0; j<nodemap->num; j++) {
428                 /* we dont need to ourself ourselves */
429                 if (nodemap->nodes[j].pnn == pnn) {
430                         continue;
431                 }
432                 /* dont check nodes that are unavailable */
433                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
434                         continue;
435                 }
436
437                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
438                                          mem_ctx, &remote_dbmap);
439                 if (ret != 0) {
440                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
441                         return -1;
442                 }
443
444                 /* step through all local databases */
445                 for (db=0; db<dbmap->num;db++) {
446                         const char *name;
447
448
449                         for (i=0;i<remote_dbmap->num;i++) {
450                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
451                                         break;
452                                 }
453                         }
454                         /* the remote node already have this database */
455                         if (i!=remote_dbmap->num) {
456                                 continue;
457                         }
458                         /* ok so we need to create this database */
459                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
460                                             mem_ctx, &name);
461                         if (ret != 0) {
462                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
463                                 return -1;
464                         }
465                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
466                                            mem_ctx, name, dbmap->dbs[db].persistent);
467                         if (ret != 0) {
468                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
469                                 return -1;
470                         }
471                 }
472         }
473
474         return 0;
475 }
476
477
478 /*
479   ensure we are attached to any databases that anyone else is attached to
480  */
481 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
482                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
483 {
484         int i, j, db, ret;
485         struct ctdb_dbid_map *remote_dbmap;
486
487         /* verify that we have all database any other node has */
488         for (j=0; j<nodemap->num; j++) {
489                 /* we dont need to ourself ourselves */
490                 if (nodemap->nodes[j].pnn == pnn) {
491                         continue;
492                 }
493                 /* dont check nodes that are unavailable */
494                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
495                         continue;
496                 }
497
498                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
499                                          mem_ctx, &remote_dbmap);
500                 if (ret != 0) {
501                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
502                         return -1;
503                 }
504
505                 /* step through all databases on the remote node */
506                 for (db=0; db<remote_dbmap->num;db++) {
507                         const char *name;
508
509                         for (i=0;i<(*dbmap)->num;i++) {
510                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
511                                         break;
512                                 }
513                         }
514                         /* we already have this db locally */
515                         if (i!=(*dbmap)->num) {
516                                 continue;
517                         }
518                         /* ok so we need to create this database and
519                            rebuild dbmap
520                          */
521                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
522                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
523                         if (ret != 0) {
524                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
525                                           nodemap->nodes[j].pnn));
526                                 return -1;
527                         }
528                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
529                                            remote_dbmap->dbs[db].persistent);
530                         if (ret != 0) {
531                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
532                                 return -1;
533                         }
534                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
535                         if (ret != 0) {
536                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
537                                 return -1;
538                         }
539                 }
540         }
541
542         return 0;
543 }
544
545
546 /*
547   pull the remote database contents from one node into the recdb
548  */
549 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
550                                     struct tdb_wrap *recdb, uint32_t dbid)
551 {
552         int ret;
553         TDB_DATA outdata;
554         struct ctdb_marshall_buffer *reply;
555         struct ctdb_rec_data *rec;
556         int i;
557         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
558
559         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
560                                CONTROL_TIMEOUT(), &outdata);
561         if (ret != 0) {
562                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
563                 talloc_free(tmp_ctx);
564                 return -1;
565         }
566
567         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
568
569         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
570                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
571                 talloc_free(tmp_ctx);
572                 return -1;
573         }
574         
575         rec = (struct ctdb_rec_data *)&reply->data[0];
576         
577         for (i=0;
578              i<reply->count;
579              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
580                 TDB_DATA key, data;
581                 struct ctdb_ltdb_header *hdr;
582                 TDB_DATA existing;
583                 
584                 key.dptr = &rec->data[0];
585                 key.dsize = rec->keylen;
586                 data.dptr = &rec->data[key.dsize];
587                 data.dsize = rec->datalen;
588                 
589                 hdr = (struct ctdb_ltdb_header *)data.dptr;
590
591                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
592                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
593                         talloc_free(tmp_ctx);
594                         return -1;
595                 }
596
597                 /* fetch the existing record, if any */
598                 existing = tdb_fetch(recdb->tdb, key);
599                 
600                 if (existing.dptr != NULL) {
601                         struct ctdb_ltdb_header header;
602                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
603                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
604                                          (unsigned)existing.dsize, srcnode));
605                                 free(existing.dptr);
606                                 talloc_free(tmp_ctx);
607                                 return -1;
608                         }
609                         header = *(struct ctdb_ltdb_header *)existing.dptr;
610                         free(existing.dptr);
611                         if (!(header.rsn < hdr->rsn ||
612                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
613                                 continue;
614                         }
615                 }
616                 
617                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
618                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
619                         talloc_free(tmp_ctx);
620                         return -1;                              
621                 }
622         }
623
624         talloc_free(tmp_ctx);
625
626         return 0;
627 }
628
629 /*
630   pull all the remote database contents into the recdb
631  */
632 static int pull_remote_database(struct ctdb_context *ctdb,
633                                 struct ctdb_recoverd *rec, 
634                                 struct ctdb_node_map *nodemap, 
635                                 struct tdb_wrap *recdb, uint32_t dbid)
636 {
637         int j;
638
639         /* pull all records from all other nodes across onto this node
640            (this merges based on rsn)
641         */
642         for (j=0; j<nodemap->num; j++) {
643                 /* dont merge from nodes that are unavailable */
644                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
645                         continue;
646                 }
647                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
648                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
649                                  nodemap->nodes[j].pnn));
650                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
651                         return -1;
652                 }
653         }
654         
655         return 0;
656 }
657
658
659 /*
660   update flags on all active nodes
661  */
662 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
663 {
664         int ret;
665
666         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
667                 if (ret != 0) {
668                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
669                 return -1;
670         }
671
672         return 0;
673 }
674
675 /*
676   ensure all nodes have the same vnnmap we do
677  */
678 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
679                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
680 {
681         int j, ret;
682
683         /* push the new vnn map out to all the nodes */
684         for (j=0; j<nodemap->num; j++) {
685                 /* dont push to nodes that are unavailable */
686                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
687                         continue;
688                 }
689
690                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
691                 if (ret != 0) {
692                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
693                         return -1;
694                 }
695         }
696
697         return 0;
698 }
699
700
701 /*
702   handler for when the admin bans a node
703 */
704 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
705                         TDB_DATA data, void *private_data)
706 {
707         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
708         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
709         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
710
711         if (data.dsize != sizeof(*b)) {
712                 DEBUG(DEBUG_ERR,("Bad data in ban_handler\n"));
713                 talloc_free(mem_ctx);
714                 return;
715         }
716
717         if (b->pnn != ctdb->pnn) {
718                 DEBUG(DEBUG_ERR,("Got a ban request for pnn:%u but our pnn is %u. Ignoring ban request\n", b->pnn, ctdb->pnn));
719                 return;
720         }
721
722         DEBUG(DEBUG_NOTICE,("Node %u has been banned for %u seconds\n", 
723                  b->pnn, b->ban_time));
724
725         ctdb_ban_node(rec, b->pnn, b->ban_time);
726         talloc_free(mem_ctx);
727 }
728
729 /*
730   handler for when the admin unbans a node
731 */
732 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
733                           TDB_DATA data, void *private_data)
734 {
735         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
736         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
737         uint32_t pnn;
738
739         if (data.dsize != sizeof(uint32_t)) {
740                 DEBUG(DEBUG_ERR,("Bad data in unban_handler\n"));
741                 talloc_free(mem_ctx);
742                 return;
743         }
744         pnn = *(uint32_t *)data.dptr;
745
746         if (pnn != ctdb->pnn) {
747                 DEBUG(DEBUG_ERR,("Got an unban request for pnn:%u but our pnn is %u. Ignoring unban request\n", pnn, ctdb->pnn));
748                 return;
749         }
750
751         DEBUG(DEBUG_NOTICE,("Node %u has been unbanned.\n", pnn));
752         ctdb_unban_node(rec, pnn);
753         talloc_free(mem_ctx);
754 }
755
756
757 struct vacuum_info {
758         struct vacuum_info *next, *prev;
759         struct ctdb_recoverd *rec;
760         uint32_t srcnode;
761         struct ctdb_db_context *ctdb_db;
762         struct ctdb_marshall_buffer *recs;
763         struct ctdb_rec_data *r;
764 };
765
766 static void vacuum_fetch_next(struct vacuum_info *v);
767
768 /*
769   called when a vacuum fetch has completed - just free it and do the next one
770  */
771 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
772 {
773         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
774         talloc_free(state);
775         vacuum_fetch_next(v);
776 }
777
778
779 /*
780   process the next element from the vacuum list
781 */
782 static void vacuum_fetch_next(struct vacuum_info *v)
783 {
784         struct ctdb_call call;
785         struct ctdb_rec_data *r;
786
787         while (v->recs->count) {
788                 struct ctdb_client_call_state *state;
789                 TDB_DATA data;
790                 struct ctdb_ltdb_header *hdr;
791
792                 ZERO_STRUCT(call);
793                 call.call_id = CTDB_NULL_FUNC;
794                 call.flags = CTDB_IMMEDIATE_MIGRATION;
795
796                 r = v->r;
797                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
798                 v->recs->count--;
799
800                 call.key.dptr = &r->data[0];
801                 call.key.dsize = r->keylen;
802
803                 /* ensure we don't block this daemon - just skip a record if we can't get
804                    the chainlock */
805                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
806                         continue;
807                 }
808
809                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
810                 if (data.dptr == NULL) {
811                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
812                         continue;
813                 }
814
815                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
816                         free(data.dptr);
817                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
818                         continue;
819                 }
820                 
821                 hdr = (struct ctdb_ltdb_header *)data.dptr;
822                 if (hdr->dmaster == v->rec->ctdb->pnn) {
823                         /* its already local */
824                         free(data.dptr);
825                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
826                         continue;
827                 }
828
829                 free(data.dptr);
830
831                 state = ctdb_call_send(v->ctdb_db, &call);
832                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
833                 if (state == NULL) {
834                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
835                         talloc_free(v);
836                         return;
837                 }
838                 state->async.fn = vacuum_fetch_callback;
839                 state->async.private_data = v;
840                 return;
841         }
842
843         talloc_free(v);
844 }
845
846
847 /*
848   destroy a vacuum info structure
849  */
850 static int vacuum_info_destructor(struct vacuum_info *v)
851 {
852         DLIST_REMOVE(v->rec->vacuum_info, v);
853         return 0;
854 }
855
856
857 /*
858   handler for vacuum fetch
859 */
860 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
861                                  TDB_DATA data, void *private_data)
862 {
863         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
864         struct ctdb_marshall_buffer *recs;
865         int ret, i;
866         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
867         const char *name;
868         struct ctdb_dbid_map *dbmap=NULL;
869         bool persistent = false;
870         struct ctdb_db_context *ctdb_db;
871         struct ctdb_rec_data *r;
872         uint32_t srcnode;
873         struct vacuum_info *v;
874
875         recs = (struct ctdb_marshall_buffer *)data.dptr;
876         r = (struct ctdb_rec_data *)&recs->data[0];
877
878         if (recs->count == 0) {
879                 talloc_free(tmp_ctx);
880                 return;
881         }
882
883         srcnode = r->reqid;
884
885         for (v=rec->vacuum_info;v;v=v->next) {
886                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
887                         /* we're already working on records from this node */
888                         talloc_free(tmp_ctx);
889                         return;
890                 }
891         }
892
893         /* work out if the database is persistent */
894         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
895         if (ret != 0) {
896                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
897                 talloc_free(tmp_ctx);
898                 return;
899         }
900
901         for (i=0;i<dbmap->num;i++) {
902                 if (dbmap->dbs[i].dbid == recs->db_id) {
903                         persistent = dbmap->dbs[i].persistent;
904                         break;
905                 }
906         }
907         if (i == dbmap->num) {
908                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
909                 talloc_free(tmp_ctx);
910                 return;         
911         }
912
913         /* find the name of this database */
914         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
915                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
916                 talloc_free(tmp_ctx);
917                 return;
918         }
919
920         /* attach to it */
921         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
922         if (ctdb_db == NULL) {
923                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
924                 talloc_free(tmp_ctx);
925                 return;
926         }
927
928         v = talloc_zero(rec, struct vacuum_info);
929         if (v == NULL) {
930                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
931                 talloc_free(tmp_ctx);
932                 return;
933         }
934
935         v->rec = rec;
936         v->srcnode = srcnode;
937         v->ctdb_db = ctdb_db;
938         v->recs = talloc_memdup(v, recs, data.dsize);
939         if (v->recs == NULL) {
940                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
941                 talloc_free(v);
942                 talloc_free(tmp_ctx);
943                 return;         
944         }
945         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
946
947         DLIST_ADD(rec->vacuum_info, v);
948
949         talloc_set_destructor(v, vacuum_info_destructor);
950
951         vacuum_fetch_next(v);
952         talloc_free(tmp_ctx);
953 }
954
955
956 /*
957   called when ctdb_wait_timeout should finish
958  */
959 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
960                               struct timeval yt, void *p)
961 {
962         uint32_t *timed_out = (uint32_t *)p;
963         (*timed_out) = 1;
964 }
965
966 /*
967   wait for a given number of seconds
968  */
969 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
970 {
971         uint32_t timed_out = 0;
972         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
973         while (!timed_out) {
974                 event_loop_once(ctdb->ev);
975         }
976 }
977
978 /*
979   called when an election times out (ends)
980  */
981 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
982                                   struct timeval t, void *p)
983 {
984         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
985         rec->election_timeout = NULL;
986 }
987
988
989 /*
990   wait for an election to finish. It finished election_timeout seconds after
991   the last election packet is received
992  */
993 static void ctdb_wait_election(struct ctdb_recoverd *rec)
994 {
995         struct ctdb_context *ctdb = rec->ctdb;
996         while (rec->election_timeout) {
997                 event_loop_once(ctdb->ev);
998         }
999 }
1000
1001 /*
1002   Update our local flags from all remote connected nodes. 
1003   This is only run when we are or we belive we are the recovery master
1004  */
1005 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1006 {
1007         int j;
1008         struct ctdb_context *ctdb = rec->ctdb;
1009         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1010
1011         /* get the nodemap for all active remote nodes and verify
1012            they are the same as for this node
1013          */
1014         for (j=0; j<nodemap->num; j++) {
1015                 struct ctdb_node_map *remote_nodemap=NULL;
1016                 int ret;
1017
1018                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1019                         continue;
1020                 }
1021                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1022                         continue;
1023                 }
1024
1025                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1026                                            mem_ctx, &remote_nodemap);
1027                 if (ret != 0) {
1028                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1029                                   nodemap->nodes[j].pnn));
1030                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1031                         talloc_free(mem_ctx);
1032                         return MONITOR_FAILED;
1033                 }
1034                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1035                         int ban_changed = (nodemap->nodes[j].flags ^ remote_nodemap->nodes[j].flags) & NODE_FLAGS_BANNED;
1036
1037                         if (ban_changed) {
1038                                 DEBUG(DEBUG_NOTICE,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
1039                                 nodemap->nodes[j].pnn,
1040                                 remote_nodemap->nodes[j].flags,
1041                                 nodemap->nodes[j].flags));
1042                         }
1043
1044                         /* We should tell our daemon about this so it
1045                            updates its flags or else we will log the same 
1046                            message again in the next iteration of recovery.
1047                            Since we are the recovery master we can just as
1048                            well update the flags on all nodes.
1049                         */
1050                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
1051                         if (ret != 0) {
1052                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1053                                 return -1;
1054                         }
1055
1056                         /* Update our local copy of the flags in the recovery
1057                            daemon.
1058                         */
1059                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1060                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1061                                  nodemap->nodes[j].flags));
1062                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1063
1064                         /* If the BANNED flag has changed for the node
1065                            this is a good reason to do a new election.
1066                          */
1067                         if (ban_changed) {
1068                                 talloc_free(mem_ctx);
1069                                 return MONITOR_ELECTION_NEEDED;
1070                         }
1071
1072                 }
1073                 talloc_free(remote_nodemap);
1074         }
1075         talloc_free(mem_ctx);
1076         return MONITOR_OK;
1077 }
1078
1079
1080 /* Create a new random generation ip. 
1081    The generation id can not be the INVALID_GENERATION id
1082 */
1083 static uint32_t new_generation(void)
1084 {
1085         uint32_t generation;
1086
1087         while (1) {
1088                 generation = random();
1089
1090                 if (generation != INVALID_GENERATION) {
1091                         break;
1092                 }
1093         }
1094
1095         return generation;
1096 }
1097
1098
1099 /*
1100   create a temporary working database
1101  */
1102 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1103 {
1104         char *name;
1105         struct tdb_wrap *recdb;
1106         unsigned tdb_flags;
1107
1108         /* open up the temporary recovery database */
1109         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
1110         if (name == NULL) {
1111                 return NULL;
1112         }
1113         unlink(name);
1114
1115         tdb_flags = TDB_NOLOCK;
1116         if (!ctdb->do_setsched) {
1117                 tdb_flags |= TDB_NOMMAP;
1118         }
1119
1120         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1121                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1122         if (recdb == NULL) {
1123                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1124         }
1125
1126         talloc_free(name);
1127
1128         return recdb;
1129 }
1130
1131
1132 /* 
1133    a traverse function for pulling all relevent records from recdb
1134  */
1135 struct recdb_data {
1136         struct ctdb_context *ctdb;
1137         struct ctdb_marshall_buffer *recdata;
1138         uint32_t len;
1139         bool failed;
1140 };
1141
1142 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1143 {
1144         struct recdb_data *params = (struct recdb_data *)p;
1145         struct ctdb_rec_data *rec;
1146         struct ctdb_ltdb_header *hdr;
1147
1148         /* skip empty records */
1149         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1150                 return 0;
1151         }
1152
1153         /* update the dmaster field to point to us */
1154         hdr = (struct ctdb_ltdb_header *)data.dptr;
1155         hdr->dmaster = params->ctdb->pnn;
1156
1157         /* add the record to the blob ready to send to the nodes */
1158         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1159         if (rec == NULL) {
1160                 params->failed = true;
1161                 return -1;
1162         }
1163         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1164         if (params->recdata == NULL) {
1165                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1166                          rec->length + params->len, params->recdata->count));
1167                 params->failed = true;
1168                 return -1;
1169         }
1170         params->recdata->count++;
1171         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1172         params->len += rec->length;
1173         talloc_free(rec);
1174
1175         return 0;
1176 }
1177
1178 /*
1179   push the recdb database out to all nodes
1180  */
1181 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1182                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1183 {
1184         struct recdb_data params;
1185         struct ctdb_marshall_buffer *recdata;
1186         TDB_DATA outdata;
1187         TALLOC_CTX *tmp_ctx;
1188         uint32_t *nodes;
1189
1190         tmp_ctx = talloc_new(ctdb);
1191         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1192
1193         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1194         CTDB_NO_MEMORY(ctdb, recdata);
1195
1196         recdata->db_id = dbid;
1197
1198         params.ctdb = ctdb;
1199         params.recdata = recdata;
1200         params.len = offsetof(struct ctdb_marshall_buffer, data);
1201         params.failed = false;
1202
1203         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1204                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1205                 talloc_free(params.recdata);
1206                 talloc_free(tmp_ctx);
1207                 return -1;
1208         }
1209
1210         if (params.failed) {
1211                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1212                 talloc_free(params.recdata);
1213                 talloc_free(tmp_ctx);
1214                 return -1;              
1215         }
1216
1217         recdata = params.recdata;
1218
1219         outdata.dptr = (void *)recdata;
1220         outdata.dsize = params.len;
1221
1222         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1223         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1224                                         nodes,
1225                                         CONTROL_TIMEOUT(), false, outdata,
1226                                         NULL, NULL,
1227                                         NULL) != 0) {
1228                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1229                 talloc_free(recdata);
1230                 talloc_free(tmp_ctx);
1231                 return -1;
1232         }
1233
1234         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1235                   dbid, recdata->count));
1236
1237         talloc_free(recdata);
1238         talloc_free(tmp_ctx);
1239
1240         return 0;
1241 }
1242
1243
1244 /*
1245   go through a full recovery on one database 
1246  */
1247 static int recover_database(struct ctdb_recoverd *rec, 
1248                             TALLOC_CTX *mem_ctx,
1249                             uint32_t dbid,
1250                             uint32_t pnn, 
1251                             struct ctdb_node_map *nodemap,
1252                             uint32_t transaction_id)
1253 {
1254         struct tdb_wrap *recdb;
1255         int ret;
1256         struct ctdb_context *ctdb = rec->ctdb;
1257         TDB_DATA data;
1258         struct ctdb_control_wipe_database w;
1259         uint32_t *nodes;
1260
1261         recdb = create_recdb(ctdb, mem_ctx);
1262         if (recdb == NULL) {
1263                 return -1;
1264         }
1265
1266         /* pull all remote databases onto the recdb */
1267         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid);
1268         if (ret != 0) {
1269                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1270                 return -1;
1271         }
1272
1273         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1274
1275         /* wipe all the remote databases. This is safe as we are in a transaction */
1276         w.db_id = dbid;
1277         w.transaction_id = transaction_id;
1278
1279         data.dptr = (void *)&w;
1280         data.dsize = sizeof(w);
1281
1282         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1283         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1284                                         nodes,
1285                                         CONTROL_TIMEOUT(), false, data,
1286                                         NULL, NULL,
1287                                         NULL) != 0) {
1288                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1289                 talloc_free(recdb);
1290                 return -1;
1291         }
1292         
1293         /* push out the correct database. This sets the dmaster and skips 
1294            the empty records */
1295         ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
1296         if (ret != 0) {
1297                 talloc_free(recdb);
1298                 return -1;
1299         }
1300
1301         /* all done with this database */
1302         talloc_free(recdb);
1303
1304         return 0;
1305 }
1306
1307 /*
1308   reload the nodes file 
1309 */
1310 static void reload_nodes_file(struct ctdb_context *ctdb)
1311 {
1312         ctdb->nodes = NULL;
1313         ctdb_load_nodes_file(ctdb);
1314 }
1315
1316         
1317 /*
1318   we are the recmaster, and recovery is needed - start a recovery run
1319  */
1320 static int do_recovery(struct ctdb_recoverd *rec, 
1321                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1322                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
1323                        int32_t culprit)
1324 {
1325         struct ctdb_context *ctdb = rec->ctdb;
1326         int i, j, ret;
1327         uint32_t generation;
1328         struct ctdb_dbid_map *dbmap;
1329         TDB_DATA data;
1330         uint32_t *nodes;
1331         struct timeval start_time;
1332
1333         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1334
1335         /* if recovery fails, force it again */
1336         rec->need_recovery = true;
1337
1338         if (culprit != -1) {
1339                 ctdb_set_culprit(rec, culprit);
1340         }
1341
1342         if (rec->culprit_counter > 2*nodemap->num) {
1343                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
1344                          rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
1345                          ctdb->tunable.recovery_ban_period));
1346                 ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
1347         }
1348
1349         DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1350         start_time = timeval_current();
1351         if (!ctdb_recovery_lock(ctdb, true)) {
1352                 ctdb_set_culprit(rec, pnn);
1353                 DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1354                 return -1;
1355         }
1356         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1357         DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
1358
1359         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
1360
1361         /* get a list of all databases */
1362         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1363         if (ret != 0) {
1364                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1365                 return -1;
1366         }
1367
1368         /* we do the db creation before we set the recovery mode, so the freeze happens
1369            on all databases we will be dealing with. */
1370
1371         /* verify that we have all the databases any other node has */
1372         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1373         if (ret != 0) {
1374                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1375                 return -1;
1376         }
1377
1378         /* verify that all other nodes have all our databases */
1379         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1380         if (ret != 0) {
1381                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1382                 return -1;
1383         }
1384
1385         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1386
1387
1388         /* set recovery mode to active on all nodes */
1389         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1390         if (ret != 0) {
1391                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1392                 return -1;
1393         }
1394
1395         /* execute the "startrecovery" event script on all nodes */
1396         ret = run_startrecovery_eventscript(rec, nodemap);
1397         if (ret!=0) {
1398                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1399                 return -1;
1400         }
1401
1402         /* pick a new generation number */
1403         generation = new_generation();
1404
1405         /* change the vnnmap on this node to use the new generation 
1406            number but not on any other nodes.
1407            this guarantees that if we abort the recovery prematurely
1408            for some reason (a node stops responding?)
1409            that we can just return immediately and we will reenter
1410            recovery shortly again.
1411            I.e. we deliberately leave the cluster with an inconsistent
1412            generation id to allow us to abort recovery at any stage and
1413            just restart it from scratch.
1414          */
1415         vnnmap->generation = generation;
1416         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1417         if (ret != 0) {
1418                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1419                 return -1;
1420         }
1421
1422         data.dptr = (void *)&generation;
1423         data.dsize = sizeof(uint32_t);
1424
1425         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1426         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1427                                         nodes,
1428                                         CONTROL_TIMEOUT(), false, data,
1429                                         NULL, NULL,
1430                                         NULL) != 0) {
1431                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1432                 return -1;
1433         }
1434
1435         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1436
1437         for (i=0;i<dbmap->num;i++) {
1438                 if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
1439                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1440                         return -1;
1441                 }
1442         }
1443
1444         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1445
1446         /* commit all the changes */
1447         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1448                                         nodes,
1449                                         CONTROL_TIMEOUT(), false, data,
1450                                         NULL, NULL,
1451                                         NULL) != 0) {
1452                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1453                 return -1;
1454         }
1455
1456         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1457         
1458
1459         /* update the capabilities for all nodes */
1460         ret = update_capabilities(ctdb, nodemap);
1461         if (ret!=0) {
1462                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1463                 return -1;
1464         }
1465
1466         /* build a new vnn map with all the currently active and
1467            unbanned nodes */
1468         generation = new_generation();
1469         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1470         CTDB_NO_MEMORY(ctdb, vnnmap);
1471         vnnmap->generation = generation;
1472         vnnmap->size = 0;
1473         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1474         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1475         for (i=j=0;i<nodemap->num;i++) {
1476                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1477                         continue;
1478                 }
1479                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1480                         /* this node can not be an lmaster */
1481                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1482                         continue;
1483                 }
1484
1485                 vnnmap->size++;
1486                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1487                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1488                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1489
1490         }
1491         if (vnnmap->size == 0) {
1492                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1493                 vnnmap->size++;
1494                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1495                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1496                 vnnmap->map[0] = pnn;
1497         }       
1498
1499         /* update to the new vnnmap on all nodes */
1500         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1501         if (ret != 0) {
1502                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1503                 return -1;
1504         }
1505
1506         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1507
1508         /* update recmaster to point to us for all nodes */
1509         ret = set_recovery_master(ctdb, nodemap, pnn);
1510         if (ret!=0) {
1511                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1512                 return -1;
1513         }
1514
1515         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1516
1517         /*
1518           update all nodes to have the same flags that we have
1519          */
1520         for (i=0;i<nodemap->num;i++) {
1521                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1522                         continue;
1523                 }
1524
1525                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1526                 if (ret != 0) {
1527                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1528                         return -1;
1529                 }
1530         }
1531
1532         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1533
1534         /* disable recovery mode */
1535         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
1536         if (ret != 0) {
1537                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1538                 return -1;
1539         }
1540
1541         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1542
1543         /*
1544           tell nodes to takeover their public IPs
1545          */
1546         rec->need_takeover_run = false;
1547         ret = ctdb_takeover_run(ctdb, nodemap);
1548         if (ret != 0) {
1549                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1550                 return -1;
1551         }
1552         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1553
1554         /* execute the "recovered" event script on all nodes */
1555         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1556         if (ret!=0) {
1557                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1558                 return -1;
1559         }
1560
1561         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1562
1563         /* send a message to all clients telling them that the cluster 
1564            has been reconfigured */
1565         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1566
1567         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1568
1569         rec->need_recovery = false;
1570
1571         /* We just finished a recovery successfully. 
1572            We now wait for rerecovery_timeout before we allow 
1573            another recovery to take place.
1574         */
1575         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1576         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1577         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1578
1579         return 0;
1580 }
1581
1582
1583 /*
1584   elections are won by first checking the number of connected nodes, then
1585   the priority time, then the pnn
1586  */
1587 struct election_message {
1588         uint32_t num_connected;
1589         struct timeval priority_time;
1590         uint32_t pnn;
1591         uint32_t node_flags;
1592 };
1593
1594 /*
1595   form this nodes election data
1596  */
1597 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1598 {
1599         int ret, i;
1600         struct ctdb_node_map *nodemap;
1601         struct ctdb_context *ctdb = rec->ctdb;
1602
1603         ZERO_STRUCTP(em);
1604
1605         em->pnn = rec->ctdb->pnn;
1606         em->priority_time = rec->priority_time;
1607         em->node_flags = rec->node_flags;
1608
1609         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1610         if (ret != 0) {
1611                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1612                 return;
1613         }
1614
1615         for (i=0;i<nodemap->num;i++) {
1616                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1617                         em->num_connected++;
1618                 }
1619         }
1620
1621         /* we shouldnt try to win this election if we cant be a recmaster */
1622         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1623                 em->num_connected = 0;
1624                 em->priority_time = timeval_current();
1625         }
1626
1627         talloc_free(nodemap);
1628 }
1629
1630 /*
1631   see if the given election data wins
1632  */
1633 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1634 {
1635         struct election_message myem;
1636         int cmp = 0;
1637
1638         ctdb_election_data(rec, &myem);
1639
1640         /* we cant win if we dont have the recmaster capability */
1641         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1642                 return false;
1643         }
1644
1645         /* we cant win if we are banned */
1646         if (rec->node_flags & NODE_FLAGS_BANNED) {
1647                 return false;
1648         }       
1649
1650         /* we will automatically win if the other node is banned */
1651         if (em->node_flags & NODE_FLAGS_BANNED) {
1652                 return true;
1653         }
1654
1655         /* try to use the most connected node */
1656         if (cmp == 0) {
1657                 cmp = (int)myem.num_connected - (int)em->num_connected;
1658         }
1659
1660         /* then the longest running node */
1661         if (cmp == 0) {
1662                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1663         }
1664
1665         if (cmp == 0) {
1666                 cmp = (int)myem.pnn - (int)em->pnn;
1667         }
1668
1669         return cmp > 0;
1670 }
1671
1672 /*
1673   send out an election request
1674  */
1675 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1676 {
1677         int ret;
1678         TDB_DATA election_data;
1679         struct election_message emsg;
1680         uint64_t srvid;
1681         struct ctdb_context *ctdb = rec->ctdb;
1682
1683         srvid = CTDB_SRVID_RECOVERY;
1684
1685         ctdb_election_data(rec, &emsg);
1686
1687         election_data.dsize = sizeof(struct election_message);
1688         election_data.dptr  = (unsigned char *)&emsg;
1689
1690
1691         /* send an election message to all active nodes */
1692         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1693
1694
1695         /* A new node that is already frozen has entered the cluster.
1696            The existing nodes are not frozen and dont need to be frozen
1697            until the election has ended and we start the actual recovery
1698         */
1699         if (update_recmaster == true) {
1700                 /* first we assume we will win the election and set 
1701                    recoverymaster to be ourself on the current node
1702                  */
1703                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1704                 if (ret != 0) {
1705                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1706                         return -1;
1707                 }
1708         }
1709
1710
1711         return 0;
1712 }
1713
1714 /*
1715   this function will unban all nodes in the cluster
1716 */
1717 static void unban_all_nodes(struct ctdb_context *ctdb)
1718 {
1719         int ret, i;
1720         struct ctdb_node_map *nodemap;
1721         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1722         
1723         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1724         if (ret != 0) {
1725                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1726                 return;
1727         }
1728
1729         for (i=0;i<nodemap->num;i++) {
1730                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1731                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1732                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1733                 }
1734         }
1735
1736         talloc_free(tmp_ctx);
1737 }
1738
1739
1740 /*
1741   we think we are winning the election - send a broadcast election request
1742  */
1743 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1744 {
1745         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1746         int ret;
1747
1748         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1749         if (ret != 0) {
1750                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1751         }
1752
1753         talloc_free(rec->send_election_te);
1754         rec->send_election_te = NULL;
1755 }
1756
1757 /*
1758   handler for memory dumps
1759 */
1760 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1761                              TDB_DATA data, void *private_data)
1762 {
1763         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1764         TDB_DATA *dump;
1765         int ret;
1766         struct rd_memdump_reply *rd;
1767
1768         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1769                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1770                 talloc_free(tmp_ctx);
1771                 return;
1772         }
1773         rd = (struct rd_memdump_reply *)data.dptr;
1774
1775         dump = talloc_zero(tmp_ctx, TDB_DATA);
1776         if (dump == NULL) {
1777                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1778                 talloc_free(tmp_ctx);
1779                 return;
1780         }
1781         ret = ctdb_dump_memory(ctdb, dump);
1782         if (ret != 0) {
1783                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1784                 talloc_free(tmp_ctx);
1785                 return;
1786         }
1787
1788 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1789
1790         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1791         if (ret != 0) {
1792                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1793                 talloc_free(tmp_ctx);
1794                 return;
1795         }
1796
1797         talloc_free(tmp_ctx);
1798 }
1799
1800 /*
1801   handler for reload_nodes
1802 */
1803 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1804                              TDB_DATA data, void *private_data)
1805 {
1806         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1807
1808         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1809
1810         reload_nodes_file(rec->ctdb);
1811 }
1812
1813
1814
1815 /*
1816   handler for recovery master elections
1817 */
1818 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1819                              TDB_DATA data, void *private_data)
1820 {
1821         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1822         int ret;
1823         struct election_message *em = (struct election_message *)data.dptr;
1824         TALLOC_CTX *mem_ctx;
1825
1826         /* we got an election packet - update the timeout for the election */
1827         talloc_free(rec->election_timeout);
1828         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1829                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1830                                                 ctdb_election_timeout, rec);
1831
1832         mem_ctx = talloc_new(ctdb);
1833
1834         /* someone called an election. check their election data
1835            and if we disagree and we would rather be the elected node, 
1836            send a new election message to all other nodes
1837          */
1838         if (ctdb_election_win(rec, em)) {
1839                 if (!rec->send_election_te) {
1840                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1841                                                                 timeval_current_ofs(0, 500000),
1842                                                                 election_send_request, rec);
1843                 }
1844                 talloc_free(mem_ctx);
1845                 /*unban_all_nodes(ctdb);*/
1846                 return;
1847         }
1848         
1849         /* we didn't win */
1850         talloc_free(rec->send_election_te);
1851         rec->send_election_te = NULL;
1852
1853         /* release the recmaster lock */
1854         if (em->pnn != ctdb->pnn &&
1855             ctdb->recovery_lock_fd != -1) {
1856                 close(ctdb->recovery_lock_fd);
1857                 ctdb->recovery_lock_fd = -1;
1858                 unban_all_nodes(ctdb);
1859         }
1860
1861         /* ok, let that guy become recmaster then */
1862         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1863         if (ret != 0) {
1864                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1865                 talloc_free(mem_ctx);
1866                 return;
1867         }
1868
1869         /* release any bans */
1870         rec->last_culprit = (uint32_t)-1;
1871         talloc_free(rec->banned_nodes);
1872         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1873         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1874
1875         talloc_free(mem_ctx);
1876         return;
1877 }
1878
1879
1880 /*
1881   force the start of the election process
1882  */
1883 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1884                            struct ctdb_node_map *nodemap)
1885 {
1886         int ret;
1887         struct ctdb_context *ctdb = rec->ctdb;
1888
1889         /* set all nodes to recovery mode to stop all internode traffic */
1890         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1891         if (ret != 0) {
1892                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1893                 return;
1894         }
1895
1896         talloc_free(rec->election_timeout);
1897         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1898                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1899                                                 ctdb_election_timeout, rec);
1900
1901         ret = send_election_request(rec, pnn, true);
1902         if (ret!=0) {
1903                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1904                 return;
1905         }
1906
1907         /* wait for a few seconds to collect all responses */
1908         ctdb_wait_election(rec);
1909 }
1910
1911
1912
1913 /*
1914   handler for when a node changes its flags
1915 */
1916 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1917                             TDB_DATA data, void *private_data)
1918 {
1919         int ret;
1920         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1921         struct ctdb_node_map *nodemap=NULL;
1922         TALLOC_CTX *tmp_ctx;
1923         uint32_t changed_flags;
1924         int i;
1925         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1926
1927         if (data.dsize != sizeof(*c)) {
1928                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1929                 return;
1930         }
1931
1932         tmp_ctx = talloc_new(ctdb);
1933         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1934
1935         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1936         if (ret != 0) {
1937                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1938                 talloc_free(tmp_ctx);
1939                 return;         
1940         }
1941
1942
1943         for (i=0;i<nodemap->num;i++) {
1944                 if (nodemap->nodes[i].pnn == c->pnn) break;
1945         }
1946
1947         if (i == nodemap->num) {
1948                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1949                 talloc_free(tmp_ctx);
1950                 return;
1951         }
1952
1953         changed_flags = c->old_flags ^ c->new_flags;
1954
1955         if (nodemap->nodes[i].flags != c->new_flags) {
1956                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1957         }
1958
1959         nodemap->nodes[i].flags = c->new_flags;
1960
1961         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1962                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1963
1964         if (ret == 0) {
1965                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1966                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1967         }
1968         
1969         if (ret == 0 &&
1970             ctdb->recovery_master == ctdb->pnn &&
1971             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
1972                 /* Only do the takeover run if the perm disabled or unhealthy
1973                    flags changed since these will cause an ip failover but not
1974                    a recovery.
1975                    If the node became disconnected or banned this will also
1976                    lead to an ip address failover but that is handled 
1977                    during recovery
1978                 */
1979                 if (changed_flags & NODE_FLAGS_DISABLED) {
1980                         rec->need_takeover_run = true;
1981                 }
1982         }
1983
1984         talloc_free(tmp_ctx);
1985 }
1986
1987 /*
1988   handler for when we need to push out flag changes ot all other nodes
1989 */
1990 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1991                             TDB_DATA data, void *private_data)
1992 {
1993         int ret;
1994         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1995
1996         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), c->pnn, c->new_flags, ~c->new_flags);
1997         if (ret != 0) {
1998                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1999         }
2000 }
2001
2002
2003 struct verify_recmode_normal_data {
2004         uint32_t count;
2005         enum monitor_result status;
2006 };
2007
2008 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2009 {
2010         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2011
2012
2013         /* one more node has responded with recmode data*/
2014         rmdata->count--;
2015
2016         /* if we failed to get the recmode, then return an error and let
2017            the main loop try again.
2018         */
2019         if (state->state != CTDB_CONTROL_DONE) {
2020                 if (rmdata->status == MONITOR_OK) {
2021                         rmdata->status = MONITOR_FAILED;
2022                 }
2023                 return;
2024         }
2025
2026         /* if we got a response, then the recmode will be stored in the
2027            status field
2028         */
2029         if (state->status != CTDB_RECOVERY_NORMAL) {
2030                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2031                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2032         }
2033
2034         return;
2035 }
2036
2037
2038 /* verify that all nodes are in normal recovery mode */
2039 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2040 {
2041         struct verify_recmode_normal_data *rmdata;
2042         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2043         struct ctdb_client_control_state *state;
2044         enum monitor_result status;
2045         int j;
2046         
2047         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2048         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2049         rmdata->count  = 0;
2050         rmdata->status = MONITOR_OK;
2051
2052         /* loop over all active nodes and send an async getrecmode call to 
2053            them*/
2054         for (j=0; j<nodemap->num; j++) {
2055                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2056                         continue;
2057                 }
2058                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2059                                         CONTROL_TIMEOUT(), 
2060                                         nodemap->nodes[j].pnn);
2061                 if (state == NULL) {
2062                         /* we failed to send the control, treat this as 
2063                            an error and try again next iteration
2064                         */                      
2065                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2066                         talloc_free(mem_ctx);
2067                         return MONITOR_FAILED;
2068                 }
2069
2070                 /* set up the callback functions */
2071                 state->async.fn = verify_recmode_normal_callback;
2072                 state->async.private_data = rmdata;
2073
2074                 /* one more control to wait for to complete */
2075                 rmdata->count++;
2076         }
2077
2078
2079         /* now wait for up to the maximum number of seconds allowed
2080            or until all nodes we expect a response from has replied
2081         */
2082         while (rmdata->count > 0) {
2083                 event_loop_once(ctdb->ev);
2084         }
2085
2086         status = rmdata->status;
2087         talloc_free(mem_ctx);
2088         return status;
2089 }
2090
2091
2092 struct verify_recmaster_data {
2093         struct ctdb_recoverd *rec;
2094         uint32_t count;
2095         uint32_t pnn;
2096         enum monitor_result status;
2097 };
2098
2099 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2100 {
2101         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2102
2103
2104         /* one more node has responded with recmaster data*/
2105         rmdata->count--;
2106
2107         /* if we failed to get the recmaster, then return an error and let
2108            the main loop try again.
2109         */
2110         if (state->state != CTDB_CONTROL_DONE) {
2111                 if (rmdata->status == MONITOR_OK) {
2112                         rmdata->status = MONITOR_FAILED;
2113                 }
2114                 return;
2115         }
2116
2117         /* if we got a response, then the recmaster will be stored in the
2118            status field
2119         */
2120         if (state->status != rmdata->pnn) {
2121                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2122                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2123                 rmdata->status = MONITOR_ELECTION_NEEDED;
2124         }
2125
2126         return;
2127 }
2128
2129
2130 /* verify that all nodes agree that we are the recmaster */
2131 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2132 {
2133         struct ctdb_context *ctdb = rec->ctdb;
2134         struct verify_recmaster_data *rmdata;
2135         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2136         struct ctdb_client_control_state *state;
2137         enum monitor_result status;
2138         int j;
2139         
2140         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2141         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2142         rmdata->rec    = rec;
2143         rmdata->count  = 0;
2144         rmdata->pnn    = pnn;
2145         rmdata->status = MONITOR_OK;
2146
2147         /* loop over all active nodes and send an async getrecmaster call to 
2148            them*/
2149         for (j=0; j<nodemap->num; j++) {
2150                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2151                         continue;
2152                 }
2153                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2154                                         CONTROL_TIMEOUT(),
2155                                         nodemap->nodes[j].pnn);
2156                 if (state == NULL) {
2157                         /* we failed to send the control, treat this as 
2158                            an error and try again next iteration
2159                         */                      
2160                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2161                         talloc_free(mem_ctx);
2162                         return MONITOR_FAILED;
2163                 }
2164
2165                 /* set up the callback functions */
2166                 state->async.fn = verify_recmaster_callback;
2167                 state->async.private_data = rmdata;
2168
2169                 /* one more control to wait for to complete */
2170                 rmdata->count++;
2171         }
2172
2173
2174         /* now wait for up to the maximum number of seconds allowed
2175            or until all nodes we expect a response from has replied
2176         */
2177         while (rmdata->count > 0) {
2178                 event_loop_once(ctdb->ev);
2179         }
2180
2181         status = rmdata->status;
2182         talloc_free(mem_ctx);
2183         return status;
2184 }
2185
2186
2187 /* called to check that the allocation of public ip addresses is ok.
2188 */
2189 static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
2190 {
2191         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2192         struct ctdb_all_public_ips *ips = NULL;
2193         struct ctdb_uptime *uptime1 = NULL;
2194         struct ctdb_uptime *uptime2 = NULL;
2195         int ret, j;
2196
2197         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2198                                 CTDB_CURRENT_NODE, &uptime1);
2199         if (ret != 0) {
2200                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2201                 talloc_free(mem_ctx);
2202                 return -1;
2203         }
2204
2205         /* read the ip allocation from the local node */
2206         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2207         if (ret != 0) {
2208                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2209                 talloc_free(mem_ctx);
2210                 return -1;
2211         }
2212
2213         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2214                                 CTDB_CURRENT_NODE, &uptime2);
2215         if (ret != 0) {
2216                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2217                 talloc_free(mem_ctx);
2218                 return -1;
2219         }
2220
2221         /* skip the check if the startrecovery time has changed */
2222         if (timeval_compare(&uptime1->last_recovery_started,
2223                             &uptime2->last_recovery_started) != 0) {
2224                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2225                 talloc_free(mem_ctx);
2226                 return 0;
2227         }
2228
2229         /* skip the check if the endrecovery time has changed */
2230         if (timeval_compare(&uptime1->last_recovery_finished,
2231                             &uptime2->last_recovery_finished) != 0) {
2232                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2233                 talloc_free(mem_ctx);
2234                 return 0;
2235         }
2236
2237         /* skip the check if we have started but not finished recovery */
2238         if (timeval_compare(&uptime1->last_recovery_finished,
2239                             &uptime1->last_recovery_started) != 1) {
2240                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery. skipping public ip address check\n"));
2241                 talloc_free(mem_ctx);
2242
2243                 return 0;
2244         }
2245
2246         /* verify that we have the ip addresses we should have
2247            and we dont have ones we shouldnt have.
2248            if we find an inconsistency we set recmode to
2249            active on the local node and wait for the recmaster
2250            to do a full blown recovery
2251         */
2252         for (j=0; j<ips->num; j++) {
2253                 if (ips->ips[j].pnn == pnn) {
2254                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2255                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2256                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2257                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2258                                 if (ret != 0) {
2259                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2260
2261                                         talloc_free(mem_ctx);
2262                                         return -1;
2263                                 }
2264                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2265                                 if (ret != 0) {
2266                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2267
2268                                         talloc_free(mem_ctx);
2269                                         return -1;
2270                                 }
2271                         }
2272                 } else {
2273                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2274                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2275                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2276
2277                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2278                                 if (ret != 0) {
2279                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2280
2281                                         talloc_free(mem_ctx);
2282                                         return -1;
2283                                 }
2284                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2285                                 if (ret != 0) {
2286                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2287
2288                                         talloc_free(mem_ctx);
2289                                         return -1;
2290                                 }
2291                         }
2292                 }
2293         }
2294
2295         talloc_free(mem_ctx);
2296         return 0;
2297 }
2298
2299
2300 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2301 {
2302         struct ctdb_node_map **remote_nodemaps = callback_data;
2303
2304         if (node_pnn >= ctdb->num_nodes) {
2305                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2306                 return;
2307         }
2308
2309         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2310
2311 }
2312
2313 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2314         struct ctdb_node_map *nodemap,
2315         struct ctdb_node_map **remote_nodemaps)
2316 {
2317         uint32_t *nodes;
2318
2319         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2320         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2321                                         nodes,
2322                                         CONTROL_TIMEOUT(), false, tdb_null,
2323                                         async_getnodemap_callback,
2324                                         NULL,
2325                                         remote_nodemaps) != 0) {
2326                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2327
2328                 return -1;
2329         }
2330
2331         return 0;
2332 }
2333
2334 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2335 struct ctdb_check_reclock_state {
2336         struct ctdb_context *ctdb;
2337         struct timeval start_time;
2338         int fd[2];
2339         pid_t child;
2340         struct timed_event *te;
2341         struct fd_event *fde;
2342         enum reclock_child_status status;
2343 };
2344
2345 /* when we free the reclock state we must kill any child process.
2346 */
2347 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2348 {
2349         struct ctdb_context *ctdb = state->ctdb;
2350
2351         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2352
2353         if (state->fd[0] != -1) {
2354                 close(state->fd[0]);
2355                 state->fd[0] = -1;
2356         }
2357         if (state->fd[1] != -1) {
2358                 close(state->fd[1]);
2359                 state->fd[1] = -1;
2360         }
2361         kill(state->child, SIGKILL);
2362         return 0;
2363 }
2364
2365 /*
2366   called if our check_reclock child times out. this would happen if
2367   i/o to the reclock file blocks.
2368  */
2369 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2370                                          struct timeval t, void *private_data)
2371 {
2372         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2373                                            struct ctdb_check_reclock_state);
2374
2375         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2376         state->status = RECLOCK_TIMEOUT;
2377 }
2378
2379 /* this is called when the child process has completed checking the reclock
2380    file and has written data back to us through the pipe.
2381 */
2382 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2383                              uint16_t flags, void *private_data)
2384 {
2385         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2386                                              struct ctdb_check_reclock_state);
2387         char c = 0;
2388         int ret;
2389
2390         /* we got a response from our child process so we can abort the
2391            timeout.
2392         */
2393         talloc_free(state->te);
2394         state->te = NULL;
2395
2396         ret = read(state->fd[0], &c, 1);
2397         if (ret != 1 || c != RECLOCK_OK) {
2398                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2399                 state->status = RECLOCK_FAILED;
2400
2401                 return;
2402         }
2403
2404         state->status = RECLOCK_OK;
2405         return;
2406 }
2407
2408 static int check_recovery_lock(struct ctdb_context *ctdb)
2409 {
2410         int ret;
2411         struct ctdb_check_reclock_state *state;
2412         pid_t parent = getpid();
2413
2414         if (ctdb->recovery_lock_fd == -1) {
2415                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2416                 return -1;
2417         }
2418
2419         state = talloc(ctdb, struct ctdb_check_reclock_state);
2420         CTDB_NO_MEMORY(ctdb, state);
2421
2422         state->ctdb = ctdb;
2423         state->start_time = timeval_current();
2424         state->status = RECLOCK_CHECKING;
2425         state->fd[0] = -1;
2426         state->fd[1] = -1;
2427
2428         ret = pipe(state->fd);
2429         if (ret != 0) {
2430                 talloc_free(state);
2431                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2432                 return -1;
2433         }
2434
2435         state->child = fork();
2436         if (state->child == (pid_t)-1) {
2437                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2438                 close(state->fd[0]);
2439                 state->fd[0] = -1;
2440                 close(state->fd[1]);
2441                 state->fd[1] = -1;
2442                 talloc_free(state);
2443                 return -1;
2444         }
2445
2446         if (state->child == 0) {
2447                 char cc = RECLOCK_OK;
2448                 close(state->fd[0]);
2449                 state->fd[0] = -1;
2450
2451                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2452                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2453                         cc = RECLOCK_FAILED;
2454                 }
2455
2456                 write(state->fd[1], &cc, 1);
2457                 /* make sure we die when our parent dies */
2458                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2459                         sleep(5);
2460                         write(state->fd[1], &cc, 1);
2461                 }
2462                 _exit(0);
2463         }
2464         close(state->fd[1]);
2465         state->fd[1] = -1;
2466
2467         talloc_set_destructor(state, check_reclock_destructor);
2468
2469         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2470                                     ctdb_check_reclock_timeout, state);
2471         if (state->te == NULL) {
2472                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2473                 talloc_free(state);
2474                 return -1;
2475         }
2476
2477         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2478                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2479                                 reclock_child_handler,
2480                                 (void *)state);
2481
2482         if (state->fde == NULL) {
2483                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2484                 talloc_free(state);
2485                 return -1;
2486         }
2487
2488         while (state->status == RECLOCK_CHECKING) {
2489                 event_loop_once(ctdb->ev);
2490         }
2491
2492         if (state->status == RECLOCK_FAILED) {
2493                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2494                 close(ctdb->recovery_lock_fd);
2495                 ctdb->recovery_lock_fd = -1;
2496                 talloc_free(state);
2497                 return -1;
2498         }
2499
2500         talloc_free(state);
2501         return 0;
2502 }
2503
2504 /*
2505   the main monitoring loop
2506  */
2507 static void monitor_cluster(struct ctdb_context *ctdb)
2508 {
2509         uint32_t pnn;
2510         TALLOC_CTX *mem_ctx=NULL;
2511         struct ctdb_node_map *nodemap=NULL;
2512         struct ctdb_node_map *recmaster_nodemap=NULL;
2513         struct ctdb_node_map **remote_nodemaps=NULL;
2514         struct ctdb_vnn_map *vnnmap=NULL;
2515         struct ctdb_vnn_map *remote_vnnmap=NULL;
2516         int32_t debug_level;
2517         int i, j, ret;
2518         struct ctdb_recoverd *rec;
2519
2520         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2521
2522         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2523         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2524
2525         rec->ctdb = ctdb;
2526         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
2527         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
2528
2529         rec->priority_time = timeval_current();
2530
2531         /* register a message port for sending memory dumps */
2532         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2533
2534         /* register a message port for recovery elections */
2535         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2536
2537         /* when nodes are disabled/enabled */
2538         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2539
2540         /* when we are asked to puch out a flag change */
2541         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2542
2543         /* when nodes are banned */
2544         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
2545
2546         /* and one for when nodes are unbanned */
2547         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
2548
2549         /* register a message port for vacuum fetch */
2550         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2551
2552         /* register a message port for reloadnodes  */
2553         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2554
2555 again:
2556         if (mem_ctx) {
2557                 talloc_free(mem_ctx);
2558                 mem_ctx = NULL;
2559         }
2560         mem_ctx = talloc_new(ctdb);
2561         if (!mem_ctx) {
2562                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2563                 exit(-1);
2564         }
2565
2566         /* we only check for recovery once every second */
2567         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2568
2569         /* verify that the main daemon is still running */
2570         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2571                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2572                 exit(-1);
2573         }
2574
2575         /* ping the local daemon to tell it we are alive */
2576         ctdb_ctrl_recd_ping(ctdb);
2577
2578         if (rec->election_timeout) {
2579                 /* an election is in progress */
2580                 goto again;
2581         }
2582
2583         /* read the debug level from the parent and update locally */
2584         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2585         if (ret !=0) {
2586                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2587                 goto again;
2588         }
2589         LogLevel = debug_level;
2590
2591
2592         /* We must check if we need to ban a node here but we want to do this
2593            as early as possible so we dont wait until we have pulled the node
2594            map from the local node. thats why we have the hardcoded value 20
2595         */
2596         if (rec->culprit_counter > 20) {
2597                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u failures in %.0f seconds - banning it for %u seconds\n",
2598                          rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
2599                          ctdb->tunable.recovery_ban_period));
2600                 ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
2601         }
2602
2603         /* get relevant tunables */
2604         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2605         if (ret != 0) {
2606                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2607                 goto again;
2608         }
2609
2610         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2611         if (pnn == (uint32_t)-1) {
2612                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2613                 goto again;
2614         }
2615
2616         /* get the vnnmap */
2617         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2618         if (ret != 0) {
2619                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2620                 goto again;
2621         }
2622
2623
2624         /* get number of nodes */
2625         if (rec->nodemap) {
2626                 talloc_free(rec->nodemap);
2627                 rec->nodemap = NULL;
2628                 nodemap=NULL;
2629         }
2630         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2631         if (ret != 0) {
2632                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2633                 goto again;
2634         }
2635         nodemap = rec->nodemap;
2636
2637         /* check which node is the recovery master */
2638         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2639         if (ret != 0) {
2640                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2641                 goto again;
2642         }
2643
2644         if (rec->recmaster == (uint32_t)-1) {
2645                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2646                 force_election(rec, pnn, nodemap);
2647                 goto again;
2648         }
2649         
2650         /* check that we (recovery daemon) and the local ctdb daemon
2651            agrees on whether we are banned or not
2652         */
2653         if (nodemap->nodes[pnn].flags & NODE_FLAGS_BANNED) {
2654                 if (rec->banned_nodes[pnn] == NULL) {
2655                         if (rec->recmaster == pnn) {
2656                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster thinks this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2657
2658                                 ctdb_unban_node(rec, pnn);
2659                         } else {
2660                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster thinks this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2661                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2662                                 ctdb_set_culprit(rec, pnn);
2663                         }
2664                         goto again;
2665                 }
2666         } else {
2667                 if (rec->banned_nodes[pnn] != NULL) {
2668                         if (rec->recmaster == pnn) {
2669                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster does not think this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2670
2671                                 ctdb_unban_node(rec, pnn);
2672                         } else {
2673                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster does not think this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2674
2675                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2676                                 ctdb_set_culprit(rec, pnn);
2677                         }
2678                         goto again;
2679                 }
2680         }
2681
2682         /* remember our own node flags */
2683         rec->node_flags = nodemap->nodes[pnn].flags;
2684
2685         /* count how many active nodes there are */
2686         rec->num_active    = 0;
2687         rec->num_connected = 0;
2688         for (i=0; i<nodemap->num; i++) {
2689                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2690                         rec->num_active++;
2691                 }
2692                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2693                         rec->num_connected++;
2694                 }
2695         }
2696
2697
2698         /* verify that the recmaster node is still active */
2699         for (j=0; j<nodemap->num; j++) {
2700                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2701                         break;
2702                 }
2703         }
2704
2705         if (j == nodemap->num) {
2706                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2707                 force_election(rec, pnn, nodemap);
2708                 goto again;
2709         }
2710
2711         /* if recovery master is disconnected we must elect a new recmaster */
2712         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2713                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2714                 force_election(rec, pnn, nodemap);
2715                 goto again;
2716         }
2717
2718         /* grap the nodemap from the recovery master to check if it is banned */
2719         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2720                                    mem_ctx, &recmaster_nodemap);
2721         if (ret != 0) {
2722                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2723                           nodemap->nodes[j].pnn));
2724                 goto again;
2725         }
2726
2727
2728         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2729                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2730                 force_election(rec, pnn, nodemap);
2731                 goto again;
2732         }
2733
2734
2735         /* verify that we have all ip addresses we should have and we dont
2736          * have addresses we shouldnt have.
2737          */ 
2738         if (ctdb->do_checkpublicip) {
2739                 if (verify_ip_allocation(ctdb, pnn) != 0) {
2740                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
2741                         goto again;
2742                 }
2743         }
2744
2745
2746         /* if we are not the recmaster then we do not need to check
2747            if recovery is needed
2748          */
2749         if (pnn != rec->recmaster) {
2750                 goto again;
2751         }
2752
2753
2754         /* ensure our local copies of flags are right */
2755         ret = update_local_flags(rec, nodemap);
2756         if (ret == MONITOR_ELECTION_NEEDED) {
2757                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
2758                 force_election(rec, pnn, nodemap);
2759                 goto again;
2760         }
2761         if (ret != MONITOR_OK) {
2762                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2763                 goto again;
2764         }
2765
2766         /* update the list of public ips that a node can handle for
2767            all connected nodes
2768         */
2769         if (ctdb->num_nodes != nodemap->num) {
2770                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2771                 reload_nodes_file(ctdb);
2772                 goto again;
2773         }
2774         for (j=0; j<nodemap->num; j++) {
2775                 /* release any existing data */
2776                 if (ctdb->nodes[j]->public_ips) {
2777                         talloc_free(ctdb->nodes[j]->public_ips);
2778                         ctdb->nodes[j]->public_ips = NULL;
2779                 }
2780
2781                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2782                         continue;
2783                 }
2784
2785                 /* grab a new shiny list of public ips from the node */
2786                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
2787                         ctdb->nodes[j]->pnn, 
2788                         ctdb->nodes,
2789                         &ctdb->nodes[j]->public_ips)) {
2790                         DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
2791                                 ctdb->nodes[j]->pnn));
2792                         goto again;
2793                 }
2794         }
2795
2796
2797         /* verify that all active nodes agree that we are the recmaster */
2798         switch (verify_recmaster(rec, nodemap, pnn)) {
2799         case MONITOR_RECOVERY_NEEDED:
2800                 /* can not happen */
2801                 goto again;
2802         case MONITOR_ELECTION_NEEDED:
2803                 force_election(rec, pnn, nodemap);
2804                 goto again;
2805         case MONITOR_OK:
2806                 break;
2807         case MONITOR_FAILED:
2808                 goto again;
2809         }
2810
2811
2812         if (rec->need_recovery) {
2813                 /* a previous recovery didn't finish */
2814                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, -1);
2815                 goto again;             
2816         }
2817
2818         /* verify that all active nodes are in normal mode 
2819            and not in recovery mode 
2820          */
2821         switch (verify_recmode(ctdb, nodemap)) {
2822         case MONITOR_RECOVERY_NEEDED:
2823                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2824                 goto again;
2825         case MONITOR_FAILED:
2826                 goto again;
2827         case MONITOR_ELECTION_NEEDED:
2828                 /* can not happen */
2829         case MONITOR_OK:
2830                 break;
2831         }
2832
2833
2834         /* we should have the reclock - check its not stale */
2835         ret = check_recovery_lock(ctdb);
2836         if (ret != 0) {
2837                 DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
2838                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2839                 goto again;
2840         }
2841
2842         /* get the nodemap for all active remote nodes
2843          */
2844         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
2845         if (remote_nodemaps == NULL) {
2846                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
2847                 goto again;
2848         }
2849         for(i=0; i<nodemap->num; i++) {
2850                 remote_nodemaps[i] = NULL;
2851         }
2852         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
2853                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
2854                 goto again;
2855         } 
2856
2857         /* verify that all other nodes have the same nodemap as we have
2858         */
2859         for (j=0; j<nodemap->num; j++) {
2860                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2861                         continue;
2862                 }
2863
2864                 if (remote_nodemaps[j] == NULL) {
2865                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
2866                         ctdb_set_culprit(rec, j);
2867
2868                         goto again;
2869                 }
2870
2871                 /* if the nodes disagree on how many nodes there are
2872                    then this is a good reason to try recovery
2873                  */
2874                 if (remote_nodemaps[j]->num != nodemap->num) {
2875                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2876                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
2877                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2878                         goto again;
2879                 }
2880
2881                 /* if the nodes disagree on which nodes exist and are
2882                    active, then that is also a good reason to do recovery
2883                  */
2884                 for (i=0;i<nodemap->num;i++) {
2885                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
2886                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2887                                           nodemap->nodes[j].pnn, i, 
2888                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
2889                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2890                                             vnnmap, nodemap->nodes[j].pnn);
2891                                 goto again;
2892                         }
2893                 }
2894
2895                 /* verify the flags are consistent
2896                 */
2897                 for (i=0; i<nodemap->num; i++) {
2898                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2899                                 continue;
2900                         }
2901                         
2902                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
2903                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
2904                                   nodemap->nodes[j].pnn, 
2905                                   nodemap->nodes[i].pnn, 
2906                                   remote_nodemaps[j]->nodes[i].flags,
2907                                   nodemap->nodes[j].flags));
2908                                 if (i == j) {
2909                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
2910                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
2911                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2912                                                     vnnmap, nodemap->nodes[j].pnn);
2913                                         goto again;
2914                                 } else {
2915                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
2916                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
2917                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2918                                                     vnnmap, nodemap->nodes[j].pnn);
2919                                         goto again;
2920                                 }
2921                         }
2922                 }
2923         }
2924
2925
2926         /* there better be the same number of lmasters in the vnn map
2927            as there are active nodes or we will have to do a recovery
2928          */
2929         if (vnnmap->size != rec->num_active) {
2930                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
2931                           vnnmap->size, rec->num_active));
2932                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2933                 goto again;
2934         }
2935
2936         /* verify that all active nodes in the nodemap also exist in 
2937            the vnnmap.
2938          */
2939         for (j=0; j<nodemap->num; j++) {
2940                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2941                         continue;
2942                 }
2943                 if (nodemap->nodes[j].pnn == pnn) {
2944                         continue;
2945                 }
2946
2947                 for (i=0; i<vnnmap->size; i++) {
2948                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2949                                 break;
2950                         }
2951                 }
2952                 if (i == vnnmap->size) {
2953                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
2954                                   nodemap->nodes[j].pnn));
2955                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2956                         goto again;
2957                 }
2958         }
2959
2960         
2961         /* verify that all other nodes have the same vnnmap
2962            and are from the same generation
2963          */
2964         for (j=0; j<nodemap->num; j++) {
2965                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2966                         continue;
2967                 }
2968                 if (nodemap->nodes[j].pnn == pnn) {
2969                         continue;
2970                 }
2971
2972                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2973                                           mem_ctx, &remote_vnnmap);
2974                 if (ret != 0) {
2975                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
2976                                   nodemap->nodes[j].pnn));
2977                         goto again;
2978                 }
2979
2980                 /* verify the vnnmap generation is the same */
2981                 if (vnnmap->generation != remote_vnnmap->generation) {
2982                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
2983                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
2984                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2985                         goto again;
2986                 }
2987
2988                 /* verify the vnnmap size is the same */
2989                 if (vnnmap->size != remote_vnnmap->size) {
2990                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
2991                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
2992                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2993                         goto again;
2994                 }
2995
2996                 /* verify the vnnmap is the same */
2997                 for (i=0;i<vnnmap->size;i++) {
2998                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
2999                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3000                                           nodemap->nodes[j].pnn));
3001                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3002                                             vnnmap, nodemap->nodes[j].pnn);
3003                                 goto again;
3004                         }
3005                 }
3006         }
3007
3008         /* we might need to change who has what IP assigned */
3009         if (rec->need_takeover_run) {
3010                 rec->need_takeover_run = false;
3011
3012                 /* execute the "startrecovery" event script on all nodes */
3013                 ret = run_startrecovery_eventscript(rec, nodemap);
3014                 if (ret!=0) {
3015                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3016                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3017                                     vnnmap, ctdb->pnn);
3018                 }
3019
3020                 ret = ctdb_takeover_run(ctdb, nodemap);
3021                 if (ret != 0) {
3022                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3023                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3024                                     vnnmap, ctdb->pnn);
3025                 }
3026
3027                 /* execute the "recovered" event script on all nodes */
3028                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3029 #if 0
3030 // we cant check whether the event completed successfully
3031 // since this script WILL fail if the node is in recovery mode
3032 // and if that race happens, the code here would just cause a second
3033 // cascading recovery.
3034                 if (ret!=0) {
3035                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3036                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3037                                     vnnmap, ctdb->pnn);
3038                 }
3039 #endif
3040         }
3041
3042
3043         goto again;
3044
3045 }
3046
3047 /*
3048   event handler for when the main ctdbd dies
3049  */
3050 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3051                                  uint16_t flags, void *private_data)
3052 {
3053         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3054         _exit(1);
3055 }
3056
3057 /*
3058   called regularly to verify that the recovery daemon is still running
3059  */
3060 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3061                               struct timeval yt, void *p)
3062 {
3063         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3064
3065         if (kill(ctdb->recoverd_pid, 0) != 0) {
3066                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3067
3068                 ctdb_stop_recoverd(ctdb);
3069                 ctdb_stop_keepalive(ctdb);
3070                 ctdb_stop_monitoring(ctdb);
3071                 ctdb_release_all_ips(ctdb);
3072                 if (ctdb->methods != NULL) {
3073                         ctdb->methods->shutdown(ctdb);
3074                 }
3075                 ctdb_event_script(ctdb, "shutdown");
3076
3077                 exit(10);       
3078         }
3079
3080         event_add_timed(ctdb->ev, ctdb, 
3081                         timeval_current_ofs(30, 0),
3082                         ctdb_check_recd, ctdb);
3083 }
3084
3085 static void recd_sig_child_handler(struct event_context *ev,
3086         struct signal_event *se, int signum, int count,
3087         void *dont_care, 
3088         void *private_data)
3089 {
3090 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3091         int status;
3092         pid_t pid = -1;
3093
3094         while (pid != 0) {
3095                 pid = waitpid(-1, &status, WNOHANG);
3096                 if (pid == -1) {
3097                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
3098                         return;
3099                 }
3100                 if (pid > 0) {
3101                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3102                 }
3103         }
3104 }
3105
3106 /*
3107   startup the recovery daemon as a child of the main ctdb daemon
3108  */
3109 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3110 {
3111         int fd[2];
3112         struct signal_event *se;
3113
3114         if (pipe(fd) != 0) {
3115                 return -1;
3116         }
3117
3118         ctdb->ctdbd_pid = getpid();
3119
3120         ctdb->recoverd_pid = fork();
3121         if (ctdb->recoverd_pid == -1) {
3122                 return -1;
3123         }
3124         
3125         if (ctdb->recoverd_pid != 0) {
3126                 close(fd[0]);
3127                 event_add_timed(ctdb->ev, ctdb, 
3128                                 timeval_current_ofs(30, 0),
3129                                 ctdb_check_recd, ctdb);
3130                 return 0;
3131         }
3132
3133         close(fd[1]);
3134
3135         srandom(getpid() ^ time(NULL));
3136
3137         if (switch_from_server_to_client(ctdb) != 0) {
3138                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3139                 exit(1);
3140         }
3141
3142         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3143                      ctdb_recoverd_parent, &fd[0]);     
3144
3145         /* set up a handler to pick up sigchld */
3146         se = event_add_signal(ctdb->ev, ctdb,
3147                                      SIGCHLD, 0,
3148                                      recd_sig_child_handler,
3149                                      ctdb);
3150         if (se == NULL) {
3151                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3152                 exit(1);
3153         }
3154
3155         monitor_cluster(ctdb);
3156
3157         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3158         return -1;
3159 }
3160
3161 /*
3162   shutdown the recovery daemon
3163  */
3164 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3165 {
3166         if (ctdb->recoverd_pid == 0) {
3167                 return;
3168         }
3169
3170         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3171         kill(ctdb->recoverd_pid, SIGTERM);
3172 }