a367630f32a340b60af9bafacb97bf3f01eb4187
[metze/ctdb/wip.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 struct ban_state {
35         struct ctdb_recoverd *rec;
36         uint32_t banned_node;
37 };
38
39 /*
40   private state of recovery daemon
41  */
42 struct ctdb_recoverd {
43         struct ctdb_context *ctdb;
44         uint32_t recmaster;
45         uint32_t num_active;
46         uint32_t num_connected;
47         struct ctdb_node_map *nodemap;
48         uint32_t last_culprit;
49         uint32_t culprit_counter;
50         struct timeval first_recover_time;
51         struct ban_state **banned_nodes;
52         struct timeval priority_time;
53         bool need_takeover_run;
54         bool need_recovery;
55         uint32_t node_flags;
56         struct timed_event *send_election_te;
57         struct timed_event *election_timeout;
58         struct vacuum_info *vacuum_info;
59 };
60
61 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
62 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
63
64
65 /*
66   unban a node
67  */
68 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
69 {
70         struct ctdb_context *ctdb = rec->ctdb;
71
72         DEBUG(DEBUG_NOTICE,("Unbanning node %u\n", pnn));
73
74         if (!ctdb_validate_pnn(ctdb, pnn)) {
75                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_unban_node\n", pnn));
76                 return;
77         }
78
79         /* If we are unbanning a different node then just pass the ban info on */
80         if (pnn != ctdb->pnn) {
81                 TDB_DATA data;
82                 int ret;
83                 
84                 DEBUG(DEBUG_NOTICE,("Unanning remote node %u. Passing the ban request on to the remote node.\n", pnn));
85
86                 data.dptr = (uint8_t *)&pnn;
87                 data.dsize = sizeof(uint32_t);
88
89                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_UNBAN_NODE, data);
90                 if (ret != 0) {
91                         DEBUG(DEBUG_ERR,("Failed to unban node %u\n", pnn));
92                         return;
93                 }
94
95                 return;
96         }
97
98         /* make sure we remember we are no longer banned in case 
99            there is an election */
100         rec->node_flags &= ~NODE_FLAGS_BANNED;
101
102         DEBUG(DEBUG_INFO,("Clearing ban flag on node %u\n", pnn));
103         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
104
105         if (rec->banned_nodes[pnn] == NULL) {
106                 DEBUG(DEBUG_INFO,("No ban recorded for this node. ctdb_unban_node() request ignored\n"));
107                 return;
108         }
109
110         talloc_free(rec->banned_nodes[pnn]);
111         rec->banned_nodes[pnn] = NULL;
112 }
113
114
115 /*
116   called when a ban has timed out
117  */
118 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
119 {
120         struct ban_state *state = talloc_get_type(p, struct ban_state);
121         struct ctdb_recoverd *rec = state->rec;
122         uint32_t pnn = state->banned_node;
123
124         DEBUG(DEBUG_NOTICE,("Ban timeout. Node %u is now unbanned\n", pnn));
125         ctdb_unban_node(rec, pnn);
126 }
127
128 /*
129   ban a node for a period of time
130  */
131 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
132 {
133         struct ctdb_context *ctdb = rec->ctdb;
134
135         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
136
137         if (!ctdb_validate_pnn(ctdb, pnn)) {
138                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
139                 return;
140         }
141
142         if (0 == ctdb->tunable.enable_bans) {
143                 DEBUG(DEBUG_INFO,("Bans are disabled - ignoring ban of node %u\n", pnn));
144                 return;
145         }
146
147         /* If we are banning a different node then just pass the ban info on */
148         if (pnn != ctdb->pnn) {
149                 struct ctdb_ban_info b;
150                 TDB_DATA data;
151                 int ret;
152                 
153                 DEBUG(DEBUG_NOTICE,("Banning remote node %u for %u seconds. Passing the ban request on to the remote node.\n", pnn, ban_time));
154
155                 b.pnn = pnn;
156                 b.ban_time = ban_time;
157
158                 data.dptr = (uint8_t *)&b;
159                 data.dsize = sizeof(b);
160
161                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_BAN_NODE, data);
162                 if (ret != 0) {
163                         DEBUG(DEBUG_ERR,("Failed to ban node %u\n", pnn));
164                         return;
165                 }
166
167                 return;
168         }
169
170         DEBUG(DEBUG_NOTICE,("self ban - lowering our election priority\n"));
171         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
172
173         /* banning ourselves - lower our election priority */
174         rec->priority_time = timeval_current();
175
176         /* make sure we remember we are banned in case there is an 
177            election */
178         rec->node_flags |= NODE_FLAGS_BANNED;
179
180         if (rec->banned_nodes[pnn] != NULL) {
181                 DEBUG(DEBUG_NOTICE,("Re-banning an already banned node. Remove previous ban and set a new ban.\n"));            
182                 talloc_free(rec->banned_nodes[pnn]);
183                 rec->banned_nodes[pnn] = NULL;
184         }
185
186         rec->banned_nodes[pnn] = talloc(rec->banned_nodes, struct ban_state);
187         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
188
189         rec->banned_nodes[pnn]->rec = rec;
190         rec->banned_nodes[pnn]->banned_node = pnn;
191
192         if (ban_time != 0) {
193                 event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
194                                 timeval_current_ofs(ban_time, 0),
195                                 ctdb_ban_timeout, rec->banned_nodes[pnn]);
196         }
197 }
198
199 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
200
201
202 /*
203   run the "recovered" eventscript on all nodes
204  */
205 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
206 {
207         TALLOC_CTX *tmp_ctx;
208         uint32_t *nodes;
209
210         tmp_ctx = talloc_new(ctdb);
211         CTDB_NO_MEMORY(ctdb, tmp_ctx);
212
213         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
214         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
215                                         nodes,
216                                         CONTROL_TIMEOUT(), false, tdb_null,
217                                         NULL, NULL,
218                                         NULL) != 0) {
219                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
220
221                 talloc_free(tmp_ctx);
222                 return -1;
223         }
224
225         talloc_free(tmp_ctx);
226         return 0;
227 }
228
229 /*
230   remember the trouble maker
231  */
232 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
233 {
234         struct ctdb_context *ctdb = rec->ctdb;
235
236         if (rec->last_culprit != culprit ||
237             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
238                 DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
239                 /* either a new node is the culprit, or we've decided to forgive them */
240                 rec->last_culprit = culprit;
241                 rec->first_recover_time = timeval_current();
242                 rec->culprit_counter = 0;
243         }
244         rec->culprit_counter++;
245 }
246
247 /*
248   remember the trouble maker
249  */
250 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
251 {
252         struct ctdb_context *ctdb = rec->ctdb;
253
254         if (rec->last_culprit != culprit ||
255             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
256                 DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
257                 /* either a new node is the culprit, or we've decided to forgive them */
258                 rec->last_culprit = culprit;
259                 rec->first_recover_time = timeval_current();
260                 rec->culprit_counter = 0;
261         }
262         rec->culprit_counter += count;
263 }
264
265 /* this callback is called for every node that failed to execute the
266    start recovery event
267 */
268 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
269 {
270         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
271
272         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
273
274         ctdb_set_culprit(rec, node_pnn);
275 }
276
277 /*
278   run the "startrecovery" eventscript on all nodes
279  */
280 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
281 {
282         TALLOC_CTX *tmp_ctx;
283         uint32_t *nodes;
284         struct ctdb_context *ctdb = rec->ctdb;
285
286         tmp_ctx = talloc_new(ctdb);
287         CTDB_NO_MEMORY(ctdb, tmp_ctx);
288
289         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
290         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
291                                         nodes,
292                                         CONTROL_TIMEOUT(), false, tdb_null,
293                                         NULL,
294                                         startrecovery_fail_callback,
295                                         rec) != 0) {
296                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
297                 talloc_free(tmp_ctx);
298                 return -1;
299         }
300
301         talloc_free(tmp_ctx);
302         return 0;
303 }
304
305 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
306 {
307         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
308                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
309                 return;
310         }
311         if (node_pnn < ctdb->num_nodes) {
312                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
313         }
314 }
315
316 /*
317   update the node capabilities for all connected nodes
318  */
319 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
320 {
321         uint32_t *nodes;
322         TALLOC_CTX *tmp_ctx;
323
324         tmp_ctx = talloc_new(ctdb);
325         CTDB_NO_MEMORY(ctdb, tmp_ctx);
326
327         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
328         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
329                                         nodes, CONTROL_TIMEOUT(),
330                                         false, tdb_null,
331                                         async_getcap_callback, NULL,
332                                         NULL) != 0) {
333                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
334                 talloc_free(tmp_ctx);
335                 return -1;
336         }
337
338         talloc_free(tmp_ctx);
339         return 0;
340 }
341
342 /*
343   change recovery mode on all nodes
344  */
345 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
346 {
347         TDB_DATA data;
348         uint32_t *nodes;
349         TALLOC_CTX *tmp_ctx;
350
351         tmp_ctx = talloc_new(ctdb);
352         CTDB_NO_MEMORY(ctdb, tmp_ctx);
353
354         /* freeze all nodes */
355         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
356         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
357                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
358                                                 nodes, CONTROL_TIMEOUT(),
359                                                 false, tdb_null,
360                                                 NULL, NULL,
361                                                 NULL) != 0) {
362                         DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
363                         talloc_free(tmp_ctx);
364                         return -1;
365                 }
366         }
367
368
369         data.dsize = sizeof(uint32_t);
370         data.dptr = (unsigned char *)&rec_mode;
371
372         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
373                                         nodes, CONTROL_TIMEOUT(),
374                                         false, data,
375                                         NULL, NULL,
376                                         NULL) != 0) {
377                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
378                 talloc_free(tmp_ctx);
379                 return -1;
380         }
381
382         talloc_free(tmp_ctx);
383         return 0;
384 }
385
386 /*
387   change recovery master on all node
388  */
389 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
390 {
391         TDB_DATA data;
392         TALLOC_CTX *tmp_ctx;
393         uint32_t *nodes;
394
395         tmp_ctx = talloc_new(ctdb);
396         CTDB_NO_MEMORY(ctdb, tmp_ctx);
397
398         data.dsize = sizeof(uint32_t);
399         data.dptr = (unsigned char *)&pnn;
400
401         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
402         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
403                                         nodes,
404                                         CONTROL_TIMEOUT(), false, data,
405                                         NULL, NULL,
406                                         NULL) != 0) {
407                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
408                 talloc_free(tmp_ctx);
409                 return -1;
410         }
411
412         talloc_free(tmp_ctx);
413         return 0;
414 }
415
416
417 /*
418   ensure all other nodes have attached to any databases that we have
419  */
420 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
421                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
422 {
423         int i, j, db, ret;
424         struct ctdb_dbid_map *remote_dbmap;
425
426         /* verify that all other nodes have all our databases */
427         for (j=0; j<nodemap->num; j++) {
428                 /* we dont need to ourself ourselves */
429                 if (nodemap->nodes[j].pnn == pnn) {
430                         continue;
431                 }
432                 /* dont check nodes that are unavailable */
433                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
434                         continue;
435                 }
436
437                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
438                                          mem_ctx, &remote_dbmap);
439                 if (ret != 0) {
440                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
441                         return -1;
442                 }
443
444                 /* step through all local databases */
445                 for (db=0; db<dbmap->num;db++) {
446                         const char *name;
447
448
449                         for (i=0;i<remote_dbmap->num;i++) {
450                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
451                                         break;
452                                 }
453                         }
454                         /* the remote node already have this database */
455                         if (i!=remote_dbmap->num) {
456                                 continue;
457                         }
458                         /* ok so we need to create this database */
459                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
460                                             mem_ctx, &name);
461                         if (ret != 0) {
462                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
463                                 return -1;
464                         }
465                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
466                                            mem_ctx, name, dbmap->dbs[db].persistent);
467                         if (ret != 0) {
468                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
469                                 return -1;
470                         }
471                 }
472         }
473
474         return 0;
475 }
476
477
478 /*
479   ensure we are attached to any databases that anyone else is attached to
480  */
481 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
482                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
483 {
484         int i, j, db, ret;
485         struct ctdb_dbid_map *remote_dbmap;
486
487         /* verify that we have all database any other node has */
488         for (j=0; j<nodemap->num; j++) {
489                 /* we dont need to ourself ourselves */
490                 if (nodemap->nodes[j].pnn == pnn) {
491                         continue;
492                 }
493                 /* dont check nodes that are unavailable */
494                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
495                         continue;
496                 }
497
498                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
499                                          mem_ctx, &remote_dbmap);
500                 if (ret != 0) {
501                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
502                         return -1;
503                 }
504
505                 /* step through all databases on the remote node */
506                 for (db=0; db<remote_dbmap->num;db++) {
507                         const char *name;
508
509                         for (i=0;i<(*dbmap)->num;i++) {
510                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
511                                         break;
512                                 }
513                         }
514                         /* we already have this db locally */
515                         if (i!=(*dbmap)->num) {
516                                 continue;
517                         }
518                         /* ok so we need to create this database and
519                            rebuild dbmap
520                          */
521                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
522                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
523                         if (ret != 0) {
524                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
525                                           nodemap->nodes[j].pnn));
526                                 return -1;
527                         }
528                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
529                                            remote_dbmap->dbs[db].persistent);
530                         if (ret != 0) {
531                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
532                                 return -1;
533                         }
534                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
535                         if (ret != 0) {
536                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
537                                 return -1;
538                         }
539                 }
540         }
541
542         return 0;
543 }
544
545
546 /*
547   pull the remote database contents from one node into the recdb
548  */
549 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
550                                     struct tdb_wrap *recdb, uint32_t dbid)
551 {
552         int ret;
553         TDB_DATA outdata;
554         struct ctdb_marshall_buffer *reply;
555         struct ctdb_rec_data *rec;
556         int i;
557         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
558
559         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
560                                CONTROL_TIMEOUT(), &outdata);
561         if (ret != 0) {
562                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
563                 talloc_free(tmp_ctx);
564                 return -1;
565         }
566
567         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
568
569         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
570                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
571                 talloc_free(tmp_ctx);
572                 return -1;
573         }
574         
575         rec = (struct ctdb_rec_data *)&reply->data[0];
576         
577         for (i=0;
578              i<reply->count;
579              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
580                 TDB_DATA key, data;
581                 struct ctdb_ltdb_header *hdr;
582                 TDB_DATA existing;
583                 
584                 key.dptr = &rec->data[0];
585                 key.dsize = rec->keylen;
586                 data.dptr = &rec->data[key.dsize];
587                 data.dsize = rec->datalen;
588                 
589                 hdr = (struct ctdb_ltdb_header *)data.dptr;
590
591                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
592                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
593                         talloc_free(tmp_ctx);
594                         return -1;
595                 }
596
597                 /* fetch the existing record, if any */
598                 existing = tdb_fetch(recdb->tdb, key);
599                 
600                 if (existing.dptr != NULL) {
601                         struct ctdb_ltdb_header header;
602                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
603                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
604                                          (unsigned)existing.dsize, srcnode));
605                                 free(existing.dptr);
606                                 talloc_free(tmp_ctx);
607                                 return -1;
608                         }
609                         header = *(struct ctdb_ltdb_header *)existing.dptr;
610                         free(existing.dptr);
611                         if (!(header.rsn < hdr->rsn ||
612                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
613                                 continue;
614                         }
615                 }
616                 
617                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
618                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
619                         talloc_free(tmp_ctx);
620                         return -1;                              
621                 }
622         }
623
624         talloc_free(tmp_ctx);
625
626         return 0;
627 }
628
629 /*
630   pull all the remote database contents into the recdb
631  */
632 static int pull_remote_database(struct ctdb_context *ctdb,
633                                 struct ctdb_recoverd *rec, 
634                                 struct ctdb_node_map *nodemap, 
635                                 struct tdb_wrap *recdb, uint32_t dbid)
636 {
637         int j;
638
639         /* pull all records from all other nodes across onto this node
640            (this merges based on rsn)
641         */
642         for (j=0; j<nodemap->num; j++) {
643                 /* dont merge from nodes that are unavailable */
644                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
645                         continue;
646                 }
647                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
648                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
649                                  nodemap->nodes[j].pnn));
650                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
651                         return -1;
652                 }
653         }
654         
655         return 0;
656 }
657
658
659 /*
660   update flags on all active nodes
661  */
662 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
663 {
664         int ret;
665
666         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
667                 if (ret != 0) {
668                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
669                 return -1;
670         }
671
672         return 0;
673 }
674
675 /*
676   ensure all nodes have the same vnnmap we do
677  */
678 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
679                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
680 {
681         int j, ret;
682
683         /* push the new vnn map out to all the nodes */
684         for (j=0; j<nodemap->num; j++) {
685                 /* dont push to nodes that are unavailable */
686                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
687                         continue;
688                 }
689
690                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
691                 if (ret != 0) {
692                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
693                         return -1;
694                 }
695         }
696
697         return 0;
698 }
699
700
701 /*
702   handler for when the admin bans a node
703 */
704 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
705                         TDB_DATA data, void *private_data)
706 {
707         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
708         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
709         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
710
711         if (data.dsize != sizeof(*b)) {
712                 DEBUG(DEBUG_ERR,("Bad data in ban_handler\n"));
713                 talloc_free(mem_ctx);
714                 return;
715         }
716
717         if (b->pnn != ctdb->pnn) {
718                 DEBUG(DEBUG_ERR,("Got a ban request for pnn:%u but our pnn is %u. Ignoring ban request\n", b->pnn, ctdb->pnn));
719                 return;
720         }
721
722         DEBUG(DEBUG_NOTICE,("Node %u has been banned for %u seconds\n", 
723                  b->pnn, b->ban_time));
724
725         ctdb_ban_node(rec, b->pnn, b->ban_time);
726         talloc_free(mem_ctx);
727 }
728
729 /*
730   handler for when the admin unbans a node
731 */
732 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
733                           TDB_DATA data, void *private_data)
734 {
735         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
736         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
737         uint32_t pnn;
738
739         if (data.dsize != sizeof(uint32_t)) {
740                 DEBUG(DEBUG_ERR,("Bad data in unban_handler\n"));
741                 talloc_free(mem_ctx);
742                 return;
743         }
744         pnn = *(uint32_t *)data.dptr;
745
746         if (pnn != ctdb->pnn) {
747                 DEBUG(DEBUG_ERR,("Got an unban request for pnn:%u but our pnn is %u. Ignoring unban request\n", pnn, ctdb->pnn));
748                 return;
749         }
750
751         DEBUG(DEBUG_NOTICE,("Node %u has been unbanned.\n", pnn));
752         ctdb_unban_node(rec, pnn);
753         talloc_free(mem_ctx);
754 }
755
756
757 struct vacuum_info {
758         struct vacuum_info *next, *prev;
759         struct ctdb_recoverd *rec;
760         uint32_t srcnode;
761         struct ctdb_db_context *ctdb_db;
762         struct ctdb_marshall_buffer *recs;
763         struct ctdb_rec_data *r;
764 };
765
766 static void vacuum_fetch_next(struct vacuum_info *v);
767
768 /*
769   called when a vacuum fetch has completed - just free it and do the next one
770  */
771 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
772 {
773         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
774         talloc_free(state);
775         vacuum_fetch_next(v);
776 }
777
778
779 /*
780   process the next element from the vacuum list
781 */
782 static void vacuum_fetch_next(struct vacuum_info *v)
783 {
784         struct ctdb_call call;
785         struct ctdb_rec_data *r;
786
787         while (v->recs->count) {
788                 struct ctdb_client_call_state *state;
789                 TDB_DATA data;
790                 struct ctdb_ltdb_header *hdr;
791
792                 ZERO_STRUCT(call);
793                 call.call_id = CTDB_NULL_FUNC;
794                 call.flags = CTDB_IMMEDIATE_MIGRATION;
795
796                 r = v->r;
797                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
798                 v->recs->count--;
799
800                 call.key.dptr = &r->data[0];
801                 call.key.dsize = r->keylen;
802
803                 /* ensure we don't block this daemon - just skip a record if we can't get
804                    the chainlock */
805                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
806                         continue;
807                 }
808
809                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
810                 if (data.dptr == NULL) {
811                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
812                         continue;
813                 }
814
815                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
816                         free(data.dptr);
817                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
818                         continue;
819                 }
820                 
821                 hdr = (struct ctdb_ltdb_header *)data.dptr;
822                 if (hdr->dmaster == v->rec->ctdb->pnn) {
823                         /* its already local */
824                         free(data.dptr);
825                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
826                         continue;
827                 }
828
829                 free(data.dptr);
830
831                 state = ctdb_call_send(v->ctdb_db, &call);
832                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
833                 if (state == NULL) {
834                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
835                         talloc_free(v);
836                         return;
837                 }
838                 state->async.fn = vacuum_fetch_callback;
839                 state->async.private_data = v;
840                 return;
841         }
842
843         talloc_free(v);
844 }
845
846
847 /*
848   destroy a vacuum info structure
849  */
850 static int vacuum_info_destructor(struct vacuum_info *v)
851 {
852         DLIST_REMOVE(v->rec->vacuum_info, v);
853         return 0;
854 }
855
856
857 /*
858   handler for vacuum fetch
859 */
860 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
861                                  TDB_DATA data, void *private_data)
862 {
863         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
864         struct ctdb_marshall_buffer *recs;
865         int ret, i;
866         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
867         const char *name;
868         struct ctdb_dbid_map *dbmap=NULL;
869         bool persistent = false;
870         struct ctdb_db_context *ctdb_db;
871         struct ctdb_rec_data *r;
872         uint32_t srcnode;
873         struct vacuum_info *v;
874
875         recs = (struct ctdb_marshall_buffer *)data.dptr;
876         r = (struct ctdb_rec_data *)&recs->data[0];
877
878         if (recs->count == 0) {
879                 talloc_free(tmp_ctx);
880                 return;
881         }
882
883         srcnode = r->reqid;
884
885         for (v=rec->vacuum_info;v;v=v->next) {
886                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
887                         /* we're already working on records from this node */
888                         talloc_free(tmp_ctx);
889                         return;
890                 }
891         }
892
893         /* work out if the database is persistent */
894         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
895         if (ret != 0) {
896                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
897                 talloc_free(tmp_ctx);
898                 return;
899         }
900
901         for (i=0;i<dbmap->num;i++) {
902                 if (dbmap->dbs[i].dbid == recs->db_id) {
903                         persistent = dbmap->dbs[i].persistent;
904                         break;
905                 }
906         }
907         if (i == dbmap->num) {
908                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
909                 talloc_free(tmp_ctx);
910                 return;         
911         }
912
913         /* find the name of this database */
914         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
915                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
916                 talloc_free(tmp_ctx);
917                 return;
918         }
919
920         /* attach to it */
921         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
922         if (ctdb_db == NULL) {
923                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
924                 talloc_free(tmp_ctx);
925                 return;
926         }
927
928         v = talloc_zero(rec, struct vacuum_info);
929         if (v == NULL) {
930                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
931                 talloc_free(tmp_ctx);
932                 return;
933         }
934
935         v->rec = rec;
936         v->srcnode = srcnode;
937         v->ctdb_db = ctdb_db;
938         v->recs = talloc_memdup(v, recs, data.dsize);
939         if (v->recs == NULL) {
940                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
941                 talloc_free(v);
942                 talloc_free(tmp_ctx);
943                 return;         
944         }
945         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
946
947         DLIST_ADD(rec->vacuum_info, v);
948
949         talloc_set_destructor(v, vacuum_info_destructor);
950
951         vacuum_fetch_next(v);
952         talloc_free(tmp_ctx);
953 }
954
955
956 /*
957   called when ctdb_wait_timeout should finish
958  */
959 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
960                               struct timeval yt, void *p)
961 {
962         uint32_t *timed_out = (uint32_t *)p;
963         (*timed_out) = 1;
964 }
965
966 /*
967   wait for a given number of seconds
968  */
969 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
970 {
971         uint32_t timed_out = 0;
972         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
973         while (!timed_out) {
974                 event_loop_once(ctdb->ev);
975         }
976 }
977
978 /*
979   called when an election times out (ends)
980  */
981 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
982                                   struct timeval t, void *p)
983 {
984         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
985         rec->election_timeout = NULL;
986 }
987
988
989 /*
990   wait for an election to finish. It finished election_timeout seconds after
991   the last election packet is received
992  */
993 static void ctdb_wait_election(struct ctdb_recoverd *rec)
994 {
995         struct ctdb_context *ctdb = rec->ctdb;
996         while (rec->election_timeout) {
997                 event_loop_once(ctdb->ev);
998         }
999 }
1000
1001 /*
1002   Update our local flags from all remote connected nodes. 
1003   This is only run when we are or we belive we are the recovery master
1004  */
1005 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1006 {
1007         int j;
1008         struct ctdb_context *ctdb = rec->ctdb;
1009         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1010
1011         /* get the nodemap for all active remote nodes and verify
1012            they are the same as for this node
1013          */
1014         for (j=0; j<nodemap->num; j++) {
1015                 struct ctdb_node_map *remote_nodemap=NULL;
1016                 int ret;
1017
1018                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1019                         continue;
1020                 }
1021                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1022                         continue;
1023                 }
1024
1025                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1026                                            mem_ctx, &remote_nodemap);
1027                 if (ret != 0) {
1028                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1029                                   nodemap->nodes[j].pnn));
1030                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1031                         talloc_free(mem_ctx);
1032                         return MONITOR_FAILED;
1033                 }
1034                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1035                         int ban_changed = (nodemap->nodes[j].flags ^ remote_nodemap->nodes[j].flags) & NODE_FLAGS_BANNED;
1036
1037                         if (ban_changed) {
1038                                 DEBUG(DEBUG_NOTICE,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
1039                                 nodemap->nodes[j].pnn,
1040                                 remote_nodemap->nodes[j].flags,
1041                                 nodemap->nodes[j].flags));
1042                         }
1043
1044                         /* We should tell our daemon about this so it
1045                            updates its flags or else we will log the same 
1046                            message again in the next iteration of recovery.
1047                            Since we are the recovery master we can just as
1048                            well update the flags on all nodes.
1049                         */
1050                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
1051                         if (ret != 0) {
1052                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1053                                 return -1;
1054                         }
1055
1056                         /* Update our local copy of the flags in the recovery
1057                            daemon.
1058                         */
1059                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1060                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1061                                  nodemap->nodes[j].flags));
1062                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1063
1064                         /* If the BANNED flag has changed for the node
1065                            this is a good reason to do a new election.
1066                          */
1067                         if (ban_changed) {
1068                                 talloc_free(mem_ctx);
1069                                 return MONITOR_ELECTION_NEEDED;
1070                         }
1071
1072                 }
1073                 talloc_free(remote_nodemap);
1074         }
1075         talloc_free(mem_ctx);
1076         return MONITOR_OK;
1077 }
1078
1079
1080 /* Create a new random generation ip. 
1081    The generation id can not be the INVALID_GENERATION id
1082 */
1083 static uint32_t new_generation(void)
1084 {
1085         uint32_t generation;
1086
1087         while (1) {
1088                 generation = random();
1089
1090                 if (generation != INVALID_GENERATION) {
1091                         break;
1092                 }
1093         }
1094
1095         return generation;
1096 }
1097
1098
1099 /*
1100   create a temporary working database
1101  */
1102 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1103 {
1104         char *name;
1105         struct tdb_wrap *recdb;
1106         unsigned tdb_flags;
1107
1108         /* open up the temporary recovery database */
1109         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
1110         if (name == NULL) {
1111                 return NULL;
1112         }
1113         unlink(name);
1114
1115         tdb_flags = TDB_NOLOCK;
1116         if (!ctdb->do_setsched) {
1117                 tdb_flags |= TDB_NOMMAP;
1118         }
1119
1120         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1121                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1122         if (recdb == NULL) {
1123                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1124         }
1125
1126         talloc_free(name);
1127
1128         return recdb;
1129 }
1130
1131
1132 /* 
1133    a traverse function for pulling all relevent records from recdb
1134  */
1135 struct recdb_data {
1136         struct ctdb_context *ctdb;
1137         struct ctdb_marshall_buffer *recdata;
1138         uint32_t len;
1139         bool failed;
1140 };
1141
1142 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1143 {
1144         struct recdb_data *params = (struct recdb_data *)p;
1145         struct ctdb_rec_data *rec;
1146         struct ctdb_ltdb_header *hdr;
1147
1148         /* skip empty records */
1149         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1150                 return 0;
1151         }
1152
1153         /* update the dmaster field to point to us */
1154         hdr = (struct ctdb_ltdb_header *)data.dptr;
1155         hdr->dmaster = params->ctdb->pnn;
1156
1157         /* add the record to the blob ready to send to the nodes */
1158         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1159         if (rec == NULL) {
1160                 params->failed = true;
1161                 return -1;
1162         }
1163         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1164         if (params->recdata == NULL) {
1165                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1166                          rec->length + params->len, params->recdata->count));
1167                 params->failed = true;
1168                 return -1;
1169         }
1170         params->recdata->count++;
1171         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1172         params->len += rec->length;
1173         talloc_free(rec);
1174
1175         return 0;
1176 }
1177
1178 /*
1179   push the recdb database out to all nodes
1180  */
1181 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1182                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1183 {
1184         struct recdb_data params;
1185         struct ctdb_marshall_buffer *recdata;
1186         TDB_DATA outdata;
1187         TALLOC_CTX *tmp_ctx;
1188         uint32_t *nodes;
1189
1190         tmp_ctx = talloc_new(ctdb);
1191         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1192
1193         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1194         CTDB_NO_MEMORY(ctdb, recdata);
1195
1196         recdata->db_id = dbid;
1197
1198         params.ctdb = ctdb;
1199         params.recdata = recdata;
1200         params.len = offsetof(struct ctdb_marshall_buffer, data);
1201         params.failed = false;
1202
1203         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1204                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1205                 talloc_free(params.recdata);
1206                 talloc_free(tmp_ctx);
1207                 return -1;
1208         }
1209
1210         if (params.failed) {
1211                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1212                 talloc_free(params.recdata);
1213                 talloc_free(tmp_ctx);
1214                 return -1;              
1215         }
1216
1217         recdata = params.recdata;
1218
1219         outdata.dptr = (void *)recdata;
1220         outdata.dsize = params.len;
1221
1222         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1223         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1224                                         nodes,
1225                                         CONTROL_TIMEOUT(), false, outdata,
1226                                         NULL, NULL,
1227                                         NULL) != 0) {
1228                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1229                 talloc_free(recdata);
1230                 talloc_free(tmp_ctx);
1231                 return -1;
1232         }
1233
1234         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1235                   dbid, recdata->count));
1236
1237         talloc_free(recdata);
1238         talloc_free(tmp_ctx);
1239
1240         return 0;
1241 }
1242
1243
1244 /*
1245   go through a full recovery on one database 
1246  */
1247 static int recover_database(struct ctdb_recoverd *rec, 
1248                             TALLOC_CTX *mem_ctx,
1249                             uint32_t dbid,
1250                             uint32_t pnn, 
1251                             struct ctdb_node_map *nodemap,
1252                             uint32_t transaction_id)
1253 {
1254         struct tdb_wrap *recdb;
1255         int ret;
1256         struct ctdb_context *ctdb = rec->ctdb;
1257         TDB_DATA data;
1258         struct ctdb_control_wipe_database w;
1259         uint32_t *nodes;
1260
1261         recdb = create_recdb(ctdb, mem_ctx);
1262         if (recdb == NULL) {
1263                 return -1;
1264         }
1265
1266         /* pull all remote databases onto the recdb */
1267         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid);
1268         if (ret != 0) {
1269                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1270                 return -1;
1271         }
1272
1273         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1274
1275         /* wipe all the remote databases. This is safe as we are in a transaction */
1276         w.db_id = dbid;
1277         w.transaction_id = transaction_id;
1278
1279         data.dptr = (void *)&w;
1280         data.dsize = sizeof(w);
1281
1282         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1283         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1284                                         nodes,
1285                                         CONTROL_TIMEOUT(), false, data,
1286                                         NULL, NULL,
1287                                         NULL) != 0) {
1288                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1289                 talloc_free(recdb);
1290                 return -1;
1291         }
1292         
1293         /* push out the correct database. This sets the dmaster and skips 
1294            the empty records */
1295         ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
1296         if (ret != 0) {
1297                 talloc_free(recdb);
1298                 return -1;
1299         }
1300
1301         /* all done with this database */
1302         talloc_free(recdb);
1303
1304         return 0;
1305 }
1306
1307 /*
1308   reload the nodes file 
1309 */
1310 static void reload_nodes_file(struct ctdb_context *ctdb)
1311 {
1312         ctdb->nodes = NULL;
1313         ctdb_load_nodes_file(ctdb);
1314 }
1315
1316         
1317 /*
1318   we are the recmaster, and recovery is needed - start a recovery run
1319  */
1320 static int do_recovery(struct ctdb_recoverd *rec, 
1321                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1322                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
1323                        int32_t culprit)
1324 {
1325         struct ctdb_context *ctdb = rec->ctdb;
1326         int i, j, ret;
1327         uint32_t generation;
1328         struct ctdb_dbid_map *dbmap;
1329         TDB_DATA data;
1330         uint32_t *nodes;
1331         struct timeval start_time;
1332
1333         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1334
1335         /* if recovery fails, force it again */
1336         rec->need_recovery = true;
1337
1338         if (culprit != -1) {
1339                 ctdb_set_culprit(rec, culprit);
1340         }
1341
1342         if (rec->culprit_counter > 2*nodemap->num) {
1343                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
1344                          rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
1345                          ctdb->tunable.recovery_ban_period));
1346                 ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
1347         }
1348
1349
1350         if (ctdb->tunable.verify_recovery_lock != 0) {
1351                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1352                 start_time = timeval_current();
1353                 if (!ctdb_recovery_lock(ctdb, true)) {
1354                         ctdb_set_culprit(rec, pnn);
1355                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1356                         return -1;
1357                 }
1358                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1359                 DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
1360         }
1361
1362         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
1363
1364         /* get a list of all databases */
1365         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1366         if (ret != 0) {
1367                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1368                 return -1;
1369         }
1370
1371         /* we do the db creation before we set the recovery mode, so the freeze happens
1372            on all databases we will be dealing with. */
1373
1374         /* verify that we have all the databases any other node has */
1375         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1376         if (ret != 0) {
1377                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1378                 return -1;
1379         }
1380
1381         /* verify that all other nodes have all our databases */
1382         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1383         if (ret != 0) {
1384                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1385                 return -1;
1386         }
1387
1388         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1389
1390
1391         /* set recovery mode to active on all nodes */
1392         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1393         if (ret != 0) {
1394                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1395                 return -1;
1396         }
1397
1398         /* execute the "startrecovery" event script on all nodes */
1399         ret = run_startrecovery_eventscript(rec, nodemap);
1400         if (ret!=0) {
1401                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1402                 return -1;
1403         }
1404
1405         /* pick a new generation number */
1406         generation = new_generation();
1407
1408         /* change the vnnmap on this node to use the new generation 
1409            number but not on any other nodes.
1410            this guarantees that if we abort the recovery prematurely
1411            for some reason (a node stops responding?)
1412            that we can just return immediately and we will reenter
1413            recovery shortly again.
1414            I.e. we deliberately leave the cluster with an inconsistent
1415            generation id to allow us to abort recovery at any stage and
1416            just restart it from scratch.
1417          */
1418         vnnmap->generation = generation;
1419         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1420         if (ret != 0) {
1421                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1422                 return -1;
1423         }
1424
1425         data.dptr = (void *)&generation;
1426         data.dsize = sizeof(uint32_t);
1427
1428         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1429         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1430                                         nodes,
1431                                         CONTROL_TIMEOUT(), false, data,
1432                                         NULL, NULL,
1433                                         NULL) != 0) {
1434                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1435                 return -1;
1436         }
1437
1438         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1439
1440         for (i=0;i<dbmap->num;i++) {
1441                 if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
1442                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1443                         return -1;
1444                 }
1445         }
1446
1447         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1448
1449         /* commit all the changes */
1450         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1451                                         nodes,
1452                                         CONTROL_TIMEOUT(), false, data,
1453                                         NULL, NULL,
1454                                         NULL) != 0) {
1455                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1456                 return -1;
1457         }
1458
1459         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1460         
1461
1462         /* update the capabilities for all nodes */
1463         ret = update_capabilities(ctdb, nodemap);
1464         if (ret!=0) {
1465                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1466                 return -1;
1467         }
1468
1469         /* build a new vnn map with all the currently active and
1470            unbanned nodes */
1471         generation = new_generation();
1472         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1473         CTDB_NO_MEMORY(ctdb, vnnmap);
1474         vnnmap->generation = generation;
1475         vnnmap->size = 0;
1476         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1477         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1478         for (i=j=0;i<nodemap->num;i++) {
1479                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1480                         continue;
1481                 }
1482                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1483                         /* this node can not be an lmaster */
1484                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1485                         continue;
1486                 }
1487
1488                 vnnmap->size++;
1489                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1490                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1491                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1492
1493         }
1494         if (vnnmap->size == 0) {
1495                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1496                 vnnmap->size++;
1497                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1498                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1499                 vnnmap->map[0] = pnn;
1500         }       
1501
1502         /* update to the new vnnmap on all nodes */
1503         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1504         if (ret != 0) {
1505                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1506                 return -1;
1507         }
1508
1509         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1510
1511         /* update recmaster to point to us for all nodes */
1512         ret = set_recovery_master(ctdb, nodemap, pnn);
1513         if (ret!=0) {
1514                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1515                 return -1;
1516         }
1517
1518         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1519
1520         /*
1521           update all nodes to have the same flags that we have
1522          */
1523         for (i=0;i<nodemap->num;i++) {
1524                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1525                         continue;
1526                 }
1527
1528                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1529                 if (ret != 0) {
1530                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1531                         return -1;
1532                 }
1533         }
1534
1535         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1536
1537         /* disable recovery mode */
1538         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
1539         if (ret != 0) {
1540                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1541                 return -1;
1542         }
1543
1544         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1545
1546         /*
1547           tell nodes to takeover their public IPs
1548          */
1549         rec->need_takeover_run = false;
1550         ret = ctdb_takeover_run(ctdb, nodemap);
1551         if (ret != 0) {
1552                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1553                 return -1;
1554         }
1555         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1556
1557         /* execute the "recovered" event script on all nodes */
1558         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1559         if (ret!=0) {
1560                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1561                 return -1;
1562         }
1563
1564         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1565
1566         /* send a message to all clients telling them that the cluster 
1567            has been reconfigured */
1568         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1569
1570         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1571
1572         rec->need_recovery = false;
1573
1574         /* We just finished a recovery successfully. 
1575            We now wait for rerecovery_timeout before we allow 
1576            another recovery to take place.
1577         */
1578         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1579         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1580         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1581
1582         return 0;
1583 }
1584
1585
1586 /*
1587   elections are won by first checking the number of connected nodes, then
1588   the priority time, then the pnn
1589  */
1590 struct election_message {
1591         uint32_t num_connected;
1592         struct timeval priority_time;
1593         uint32_t pnn;
1594         uint32_t node_flags;
1595 };
1596
1597 /*
1598   form this nodes election data
1599  */
1600 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1601 {
1602         int ret, i;
1603         struct ctdb_node_map *nodemap;
1604         struct ctdb_context *ctdb = rec->ctdb;
1605
1606         ZERO_STRUCTP(em);
1607
1608         em->pnn = rec->ctdb->pnn;
1609         em->priority_time = rec->priority_time;
1610         em->node_flags = rec->node_flags;
1611
1612         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1613         if (ret != 0) {
1614                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1615                 return;
1616         }
1617
1618         for (i=0;i<nodemap->num;i++) {
1619                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1620                         em->num_connected++;
1621                 }
1622         }
1623
1624         /* we shouldnt try to win this election if we cant be a recmaster */
1625         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1626                 em->num_connected = 0;
1627                 em->priority_time = timeval_current();
1628         }
1629
1630         talloc_free(nodemap);
1631 }
1632
1633 /*
1634   see if the given election data wins
1635  */
1636 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1637 {
1638         struct election_message myem;
1639         int cmp = 0;
1640
1641         ctdb_election_data(rec, &myem);
1642
1643         /* we cant win if we dont have the recmaster capability */
1644         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1645                 return false;
1646         }
1647
1648         /* we cant win if we are banned */
1649         if (rec->node_flags & NODE_FLAGS_BANNED) {
1650                 return false;
1651         }       
1652
1653         /* we will automatically win if the other node is banned */
1654         if (em->node_flags & NODE_FLAGS_BANNED) {
1655                 return true;
1656         }
1657
1658         /* try to use the most connected node */
1659         if (cmp == 0) {
1660                 cmp = (int)myem.num_connected - (int)em->num_connected;
1661         }
1662
1663         /* then the longest running node */
1664         if (cmp == 0) {
1665                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1666         }
1667
1668         if (cmp == 0) {
1669                 cmp = (int)myem.pnn - (int)em->pnn;
1670         }
1671
1672         return cmp > 0;
1673 }
1674
1675 /*
1676   send out an election request
1677  */
1678 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1679 {
1680         int ret;
1681         TDB_DATA election_data;
1682         struct election_message emsg;
1683         uint64_t srvid;
1684         struct ctdb_context *ctdb = rec->ctdb;
1685
1686         srvid = CTDB_SRVID_RECOVERY;
1687
1688         ctdb_election_data(rec, &emsg);
1689
1690         election_data.dsize = sizeof(struct election_message);
1691         election_data.dptr  = (unsigned char *)&emsg;
1692
1693
1694         /* send an election message to all active nodes */
1695         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1696
1697
1698         /* A new node that is already frozen has entered the cluster.
1699            The existing nodes are not frozen and dont need to be frozen
1700            until the election has ended and we start the actual recovery
1701         */
1702         if (update_recmaster == true) {
1703                 /* first we assume we will win the election and set 
1704                    recoverymaster to be ourself on the current node
1705                  */
1706                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1707                 if (ret != 0) {
1708                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1709                         return -1;
1710                 }
1711         }
1712
1713
1714         return 0;
1715 }
1716
1717 /*
1718   this function will unban all nodes in the cluster
1719 */
1720 static void unban_all_nodes(struct ctdb_context *ctdb)
1721 {
1722         int ret, i;
1723         struct ctdb_node_map *nodemap;
1724         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1725         
1726         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1727         if (ret != 0) {
1728                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1729                 return;
1730         }
1731
1732         for (i=0;i<nodemap->num;i++) {
1733                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1734                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1735                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1736                 }
1737         }
1738
1739         talloc_free(tmp_ctx);
1740 }
1741
1742
1743 /*
1744   we think we are winning the election - send a broadcast election request
1745  */
1746 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1747 {
1748         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1749         int ret;
1750
1751         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1752         if (ret != 0) {
1753                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1754         }
1755
1756         talloc_free(rec->send_election_te);
1757         rec->send_election_te = NULL;
1758 }
1759
1760 /*
1761   handler for memory dumps
1762 */
1763 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1764                              TDB_DATA data, void *private_data)
1765 {
1766         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1767         TDB_DATA *dump;
1768         int ret;
1769         struct rd_memdump_reply *rd;
1770
1771         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1772                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1773                 talloc_free(tmp_ctx);
1774                 return;
1775         }
1776         rd = (struct rd_memdump_reply *)data.dptr;
1777
1778         dump = talloc_zero(tmp_ctx, TDB_DATA);
1779         if (dump == NULL) {
1780                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1781                 talloc_free(tmp_ctx);
1782                 return;
1783         }
1784         ret = ctdb_dump_memory(ctdb, dump);
1785         if (ret != 0) {
1786                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1787                 talloc_free(tmp_ctx);
1788                 return;
1789         }
1790
1791 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1792
1793         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1794         if (ret != 0) {
1795                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1796                 talloc_free(tmp_ctx);
1797                 return;
1798         }
1799
1800         talloc_free(tmp_ctx);
1801 }
1802
1803 /*
1804   handler for reload_nodes
1805 */
1806 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1807                              TDB_DATA data, void *private_data)
1808 {
1809         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1810
1811         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1812
1813         reload_nodes_file(rec->ctdb);
1814 }
1815
1816
1817
1818 /*
1819   handler for recovery master elections
1820 */
1821 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1822                              TDB_DATA data, void *private_data)
1823 {
1824         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1825         int ret;
1826         struct election_message *em = (struct election_message *)data.dptr;
1827         TALLOC_CTX *mem_ctx;
1828
1829         /* we got an election packet - update the timeout for the election */
1830         talloc_free(rec->election_timeout);
1831         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1832                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1833                                                 ctdb_election_timeout, rec);
1834
1835         mem_ctx = talloc_new(ctdb);
1836
1837         /* someone called an election. check their election data
1838            and if we disagree and we would rather be the elected node, 
1839            send a new election message to all other nodes
1840          */
1841         if (ctdb_election_win(rec, em)) {
1842                 if (!rec->send_election_te) {
1843                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1844                                                                 timeval_current_ofs(0, 500000),
1845                                                                 election_send_request, rec);
1846                 }
1847                 talloc_free(mem_ctx);
1848                 /*unban_all_nodes(ctdb);*/
1849                 return;
1850         }
1851         
1852         /* we didn't win */
1853         talloc_free(rec->send_election_te);
1854         rec->send_election_te = NULL;
1855
1856         if (ctdb->tunable.verify_recovery_lock != 0) {
1857                 /* release the recmaster lock */
1858                 if (em->pnn != ctdb->pnn &&
1859                     ctdb->recovery_lock_fd != -1) {
1860                         close(ctdb->recovery_lock_fd);
1861                         ctdb->recovery_lock_fd = -1;
1862                         unban_all_nodes(ctdb);
1863                 }
1864         }
1865
1866         /* ok, let that guy become recmaster then */
1867         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1868         if (ret != 0) {
1869                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1870                 talloc_free(mem_ctx);
1871                 return;
1872         }
1873
1874         /* release any bans */
1875         rec->last_culprit = (uint32_t)-1;
1876         talloc_free(rec->banned_nodes);
1877         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1878         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1879
1880         talloc_free(mem_ctx);
1881         return;
1882 }
1883
1884
1885 /*
1886   force the start of the election process
1887  */
1888 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1889                            struct ctdb_node_map *nodemap)
1890 {
1891         int ret;
1892         struct ctdb_context *ctdb = rec->ctdb;
1893
1894         /* set all nodes to recovery mode to stop all internode traffic */
1895         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1896         if (ret != 0) {
1897                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1898                 return;
1899         }
1900
1901         talloc_free(rec->election_timeout);
1902         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1903                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1904                                                 ctdb_election_timeout, rec);
1905
1906         ret = send_election_request(rec, pnn, true);
1907         if (ret!=0) {
1908                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1909                 return;
1910         }
1911
1912         /* wait for a few seconds to collect all responses */
1913         ctdb_wait_election(rec);
1914 }
1915
1916
1917
1918 /*
1919   handler for when a node changes its flags
1920 */
1921 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1922                             TDB_DATA data, void *private_data)
1923 {
1924         int ret;
1925         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1926         struct ctdb_node_map *nodemap=NULL;
1927         TALLOC_CTX *tmp_ctx;
1928         uint32_t changed_flags;
1929         int i;
1930         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1931
1932         if (data.dsize != sizeof(*c)) {
1933                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1934                 return;
1935         }
1936
1937         tmp_ctx = talloc_new(ctdb);
1938         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1939
1940         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1941         if (ret != 0) {
1942                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1943                 talloc_free(tmp_ctx);
1944                 return;         
1945         }
1946
1947
1948         for (i=0;i<nodemap->num;i++) {
1949                 if (nodemap->nodes[i].pnn == c->pnn) break;
1950         }
1951
1952         if (i == nodemap->num) {
1953                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1954                 talloc_free(tmp_ctx);
1955                 return;
1956         }
1957
1958         changed_flags = c->old_flags ^ c->new_flags;
1959
1960         if (nodemap->nodes[i].flags != c->new_flags) {
1961                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1962         }
1963
1964         nodemap->nodes[i].flags = c->new_flags;
1965
1966         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1967                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1968
1969         if (ret == 0) {
1970                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1971                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1972         }
1973         
1974         if (ret == 0 &&
1975             ctdb->recovery_master == ctdb->pnn &&
1976             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
1977                 /* Only do the takeover run if the perm disabled or unhealthy
1978                    flags changed since these will cause an ip failover but not
1979                    a recovery.
1980                    If the node became disconnected or banned this will also
1981                    lead to an ip address failover but that is handled 
1982                    during recovery
1983                 */
1984                 if (changed_flags & NODE_FLAGS_DISABLED) {
1985                         rec->need_takeover_run = true;
1986                 }
1987         }
1988
1989         talloc_free(tmp_ctx);
1990 }
1991
1992 /*
1993   handler for when we need to push out flag changes ot all other nodes
1994 */
1995 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1996                             TDB_DATA data, void *private_data)
1997 {
1998         int ret;
1999         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2000
2001         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), c->pnn, c->new_flags, ~c->new_flags);
2002         if (ret != 0) {
2003                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2004         }
2005 }
2006
2007
2008 struct verify_recmode_normal_data {
2009         uint32_t count;
2010         enum monitor_result status;
2011 };
2012
2013 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2014 {
2015         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2016
2017
2018         /* one more node has responded with recmode data*/
2019         rmdata->count--;
2020
2021         /* if we failed to get the recmode, then return an error and let
2022            the main loop try again.
2023         */
2024         if (state->state != CTDB_CONTROL_DONE) {
2025                 if (rmdata->status == MONITOR_OK) {
2026                         rmdata->status = MONITOR_FAILED;
2027                 }
2028                 return;
2029         }
2030
2031         /* if we got a response, then the recmode will be stored in the
2032            status field
2033         */
2034         if (state->status != CTDB_RECOVERY_NORMAL) {
2035                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2036                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2037         }
2038
2039         return;
2040 }
2041
2042
2043 /* verify that all nodes are in normal recovery mode */
2044 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2045 {
2046         struct verify_recmode_normal_data *rmdata;
2047         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2048         struct ctdb_client_control_state *state;
2049         enum monitor_result status;
2050         int j;
2051         
2052         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2053         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2054         rmdata->count  = 0;
2055         rmdata->status = MONITOR_OK;
2056
2057         /* loop over all active nodes and send an async getrecmode call to 
2058            them*/
2059         for (j=0; j<nodemap->num; j++) {
2060                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2061                         continue;
2062                 }
2063                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2064                                         CONTROL_TIMEOUT(), 
2065                                         nodemap->nodes[j].pnn);
2066                 if (state == NULL) {
2067                         /* we failed to send the control, treat this as 
2068                            an error and try again next iteration
2069                         */                      
2070                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2071                         talloc_free(mem_ctx);
2072                         return MONITOR_FAILED;
2073                 }
2074
2075                 /* set up the callback functions */
2076                 state->async.fn = verify_recmode_normal_callback;
2077                 state->async.private_data = rmdata;
2078
2079                 /* one more control to wait for to complete */
2080                 rmdata->count++;
2081         }
2082
2083
2084         /* now wait for up to the maximum number of seconds allowed
2085            or until all nodes we expect a response from has replied
2086         */
2087         while (rmdata->count > 0) {
2088                 event_loop_once(ctdb->ev);
2089         }
2090
2091         status = rmdata->status;
2092         talloc_free(mem_ctx);
2093         return status;
2094 }
2095
2096
2097 struct verify_recmaster_data {
2098         struct ctdb_recoverd *rec;
2099         uint32_t count;
2100         uint32_t pnn;
2101         enum monitor_result status;
2102 };
2103
2104 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2105 {
2106         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2107
2108
2109         /* one more node has responded with recmaster data*/
2110         rmdata->count--;
2111
2112         /* if we failed to get the recmaster, then return an error and let
2113            the main loop try again.
2114         */
2115         if (state->state != CTDB_CONTROL_DONE) {
2116                 if (rmdata->status == MONITOR_OK) {
2117                         rmdata->status = MONITOR_FAILED;
2118                 }
2119                 return;
2120         }
2121
2122         /* if we got a response, then the recmaster will be stored in the
2123            status field
2124         */
2125         if (state->status != rmdata->pnn) {
2126                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2127                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2128                 rmdata->status = MONITOR_ELECTION_NEEDED;
2129         }
2130
2131         return;
2132 }
2133
2134
2135 /* verify that all nodes agree that we are the recmaster */
2136 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2137 {
2138         struct ctdb_context *ctdb = rec->ctdb;
2139         struct verify_recmaster_data *rmdata;
2140         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2141         struct ctdb_client_control_state *state;
2142         enum monitor_result status;
2143         int j;
2144         
2145         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2146         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2147         rmdata->rec    = rec;
2148         rmdata->count  = 0;
2149         rmdata->pnn    = pnn;
2150         rmdata->status = MONITOR_OK;
2151
2152         /* loop over all active nodes and send an async getrecmaster call to 
2153            them*/
2154         for (j=0; j<nodemap->num; j++) {
2155                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2156                         continue;
2157                 }
2158                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2159                                         CONTROL_TIMEOUT(),
2160                                         nodemap->nodes[j].pnn);
2161                 if (state == NULL) {
2162                         /* we failed to send the control, treat this as 
2163                            an error and try again next iteration
2164                         */                      
2165                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2166                         talloc_free(mem_ctx);
2167                         return MONITOR_FAILED;
2168                 }
2169
2170                 /* set up the callback functions */
2171                 state->async.fn = verify_recmaster_callback;
2172                 state->async.private_data = rmdata;
2173
2174                 /* one more control to wait for to complete */
2175                 rmdata->count++;
2176         }
2177
2178
2179         /* now wait for up to the maximum number of seconds allowed
2180            or until all nodes we expect a response from has replied
2181         */
2182         while (rmdata->count > 0) {
2183                 event_loop_once(ctdb->ev);
2184         }
2185
2186         status = rmdata->status;
2187         talloc_free(mem_ctx);
2188         return status;
2189 }
2190
2191
2192 /* called to check that the allocation of public ip addresses is ok.
2193 */
2194 static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
2195 {
2196         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2197         struct ctdb_all_public_ips *ips = NULL;
2198         struct ctdb_uptime *uptime1 = NULL;
2199         struct ctdb_uptime *uptime2 = NULL;
2200         int ret, j;
2201
2202         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2203                                 CTDB_CURRENT_NODE, &uptime1);
2204         if (ret != 0) {
2205                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2206                 talloc_free(mem_ctx);
2207                 return -1;
2208         }
2209
2210         /* read the ip allocation from the local node */
2211         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2212         if (ret != 0) {
2213                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2214                 talloc_free(mem_ctx);
2215                 return -1;
2216         }
2217
2218         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2219                                 CTDB_CURRENT_NODE, &uptime2);
2220         if (ret != 0) {
2221                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2222                 talloc_free(mem_ctx);
2223                 return -1;
2224         }
2225
2226         /* skip the check if the startrecovery time has changed */
2227         if (timeval_compare(&uptime1->last_recovery_started,
2228                             &uptime2->last_recovery_started) != 0) {
2229                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2230                 talloc_free(mem_ctx);
2231                 return 0;
2232         }
2233
2234         /* skip the check if the endrecovery time has changed */
2235         if (timeval_compare(&uptime1->last_recovery_finished,
2236                             &uptime2->last_recovery_finished) != 0) {
2237                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2238                 talloc_free(mem_ctx);
2239                 return 0;
2240         }
2241
2242         /* skip the check if we have started but not finished recovery */
2243         if (timeval_compare(&uptime1->last_recovery_finished,
2244                             &uptime1->last_recovery_started) != 1) {
2245                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery. skipping public ip address check\n"));
2246                 talloc_free(mem_ctx);
2247
2248                 return 0;
2249         }
2250
2251         /* verify that we have the ip addresses we should have
2252            and we dont have ones we shouldnt have.
2253            if we find an inconsistency we set recmode to
2254            active on the local node and wait for the recmaster
2255            to do a full blown recovery
2256         */
2257         for (j=0; j<ips->num; j++) {
2258                 if (ips->ips[j].pnn == pnn) {
2259                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2260                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2261                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2262                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2263                                 if (ret != 0) {
2264                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2265
2266                                         talloc_free(mem_ctx);
2267                                         return -1;
2268                                 }
2269                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2270                                 if (ret != 0) {
2271                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2272
2273                                         talloc_free(mem_ctx);
2274                                         return -1;
2275                                 }
2276                         }
2277                 } else {
2278                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2279                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2280                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2281
2282                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2283                                 if (ret != 0) {
2284                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2285
2286                                         talloc_free(mem_ctx);
2287                                         return -1;
2288                                 }
2289                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2290                                 if (ret != 0) {
2291                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2292
2293                                         talloc_free(mem_ctx);
2294                                         return -1;
2295                                 }
2296                         }
2297                 }
2298         }
2299
2300         talloc_free(mem_ctx);
2301         return 0;
2302 }
2303
2304
2305 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2306 {
2307         struct ctdb_node_map **remote_nodemaps = callback_data;
2308
2309         if (node_pnn >= ctdb->num_nodes) {
2310                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2311                 return;
2312         }
2313
2314         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2315
2316 }
2317
2318 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2319         struct ctdb_node_map *nodemap,
2320         struct ctdb_node_map **remote_nodemaps)
2321 {
2322         uint32_t *nodes;
2323
2324         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2325         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2326                                         nodes,
2327                                         CONTROL_TIMEOUT(), false, tdb_null,
2328                                         async_getnodemap_callback,
2329                                         NULL,
2330                                         remote_nodemaps) != 0) {
2331                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2332
2333                 return -1;
2334         }
2335
2336         return 0;
2337 }
2338
2339 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2340 struct ctdb_check_reclock_state {
2341         struct ctdb_context *ctdb;
2342         struct timeval start_time;
2343         int fd[2];
2344         pid_t child;
2345         struct timed_event *te;
2346         struct fd_event *fde;
2347         enum reclock_child_status status;
2348 };
2349
2350 /* when we free the reclock state we must kill any child process.
2351 */
2352 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2353 {
2354         struct ctdb_context *ctdb = state->ctdb;
2355
2356         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2357
2358         if (state->fd[0] != -1) {
2359                 close(state->fd[0]);
2360                 state->fd[0] = -1;
2361         }
2362         if (state->fd[1] != -1) {
2363                 close(state->fd[1]);
2364                 state->fd[1] = -1;
2365         }
2366         kill(state->child, SIGKILL);
2367         return 0;
2368 }
2369
2370 /*
2371   called if our check_reclock child times out. this would happen if
2372   i/o to the reclock file blocks.
2373  */
2374 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2375                                          struct timeval t, void *private_data)
2376 {
2377         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2378                                            struct ctdb_check_reclock_state);
2379
2380         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2381         state->status = RECLOCK_TIMEOUT;
2382 }
2383
2384 /* this is called when the child process has completed checking the reclock
2385    file and has written data back to us through the pipe.
2386 */
2387 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2388                              uint16_t flags, void *private_data)
2389 {
2390         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2391                                              struct ctdb_check_reclock_state);
2392         char c = 0;
2393         int ret;
2394
2395         /* we got a response from our child process so we can abort the
2396            timeout.
2397         */
2398         talloc_free(state->te);
2399         state->te = NULL;
2400
2401         ret = read(state->fd[0], &c, 1);
2402         if (ret != 1 || c != RECLOCK_OK) {
2403                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2404                 state->status = RECLOCK_FAILED;
2405
2406                 return;
2407         }
2408
2409         state->status = RECLOCK_OK;
2410         return;
2411 }
2412
2413 static int check_recovery_lock(struct ctdb_context *ctdb)
2414 {
2415         int ret;
2416         struct ctdb_check_reclock_state *state;
2417         pid_t parent = getpid();
2418
2419         if (ctdb->recovery_lock_fd == -1) {
2420                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2421                 return -1;
2422         }
2423
2424         state = talloc(ctdb, struct ctdb_check_reclock_state);
2425         CTDB_NO_MEMORY(ctdb, state);
2426
2427         state->ctdb = ctdb;
2428         state->start_time = timeval_current();
2429         state->status = RECLOCK_CHECKING;
2430         state->fd[0] = -1;
2431         state->fd[1] = -1;
2432
2433         ret = pipe(state->fd);
2434         if (ret != 0) {
2435                 talloc_free(state);
2436                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2437                 return -1;
2438         }
2439
2440         state->child = fork();
2441         if (state->child == (pid_t)-1) {
2442                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2443                 close(state->fd[0]);
2444                 state->fd[0] = -1;
2445                 close(state->fd[1]);
2446                 state->fd[1] = -1;
2447                 talloc_free(state);
2448                 return -1;
2449         }
2450
2451         if (state->child == 0) {
2452                 char cc = RECLOCK_OK;
2453                 close(state->fd[0]);
2454                 state->fd[0] = -1;
2455
2456                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2457                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2458                         cc = RECLOCK_FAILED;
2459                 }
2460
2461                 write(state->fd[1], &cc, 1);
2462                 /* make sure we die when our parent dies */
2463                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2464                         sleep(5);
2465                         write(state->fd[1], &cc, 1);
2466                 }
2467                 _exit(0);
2468         }
2469         close(state->fd[1]);
2470         state->fd[1] = -1;
2471
2472         talloc_set_destructor(state, check_reclock_destructor);
2473
2474         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2475                                     ctdb_check_reclock_timeout, state);
2476         if (state->te == NULL) {
2477                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2478                 talloc_free(state);
2479                 return -1;
2480         }
2481
2482         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2483                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2484                                 reclock_child_handler,
2485                                 (void *)state);
2486
2487         if (state->fde == NULL) {
2488                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2489                 talloc_free(state);
2490                 return -1;
2491         }
2492
2493         while (state->status == RECLOCK_CHECKING) {
2494                 event_loop_once(ctdb->ev);
2495         }
2496
2497         if (state->status == RECLOCK_FAILED) {
2498                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2499                 close(ctdb->recovery_lock_fd);
2500                 ctdb->recovery_lock_fd = -1;
2501                 talloc_free(state);
2502                 return -1;
2503         }
2504
2505         talloc_free(state);
2506         return 0;
2507 }
2508
2509 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2510 {
2511         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2512         const char *reclockfile;
2513
2514         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2515                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2516                 talloc_free(tmp_ctx);
2517                 return -1;      
2518         }
2519
2520         if (reclockfile == NULL) {
2521                 if (ctdb->recovery_lock_file != NULL) {
2522                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2523                         talloc_free(ctdb->recovery_lock_file);
2524                         ctdb->recovery_lock_file = NULL;
2525                         if (ctdb->recovery_lock_fd != -1) {
2526                                 close(ctdb->recovery_lock_fd);
2527                                 ctdb->recovery_lock_fd = -1;
2528                         }
2529                 }
2530                 ctdb->tunable.verify_recovery_lock = 0;
2531                 talloc_free(tmp_ctx);
2532                 return 0;
2533         }
2534
2535         if (ctdb->recovery_lock_file == NULL) {
2536                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2537                 if (ctdb->recovery_lock_fd != -1) {
2538                         close(ctdb->recovery_lock_fd);
2539                         ctdb->recovery_lock_fd = -1;
2540                 }
2541                 talloc_free(tmp_ctx);
2542                 return 0;
2543         }
2544
2545
2546         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2547                 talloc_free(tmp_ctx);
2548                 return 0;
2549         }
2550
2551         talloc_free(ctdb->recovery_lock_file);
2552         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2553         ctdb->tunable.verify_recovery_lock = 0;
2554         if (ctdb->recovery_lock_fd != -1) {
2555                 close(ctdb->recovery_lock_fd);
2556                 ctdb->recovery_lock_fd = -1;
2557         }
2558
2559         talloc_free(tmp_ctx);
2560         return 0;
2561 }
2562                 
2563 /*
2564   the main monitoring loop
2565  */
2566 static void monitor_cluster(struct ctdb_context *ctdb)
2567 {
2568         uint32_t pnn;
2569         TALLOC_CTX *mem_ctx=NULL;
2570         struct ctdb_node_map *nodemap=NULL;
2571         struct ctdb_node_map *recmaster_nodemap=NULL;
2572         struct ctdb_node_map **remote_nodemaps=NULL;
2573         struct ctdb_vnn_map *vnnmap=NULL;
2574         struct ctdb_vnn_map *remote_vnnmap=NULL;
2575         int32_t debug_level;
2576         int i, j, ret;
2577         struct ctdb_recoverd *rec;
2578
2579         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2580
2581         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2582         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2583
2584         rec->ctdb = ctdb;
2585         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
2586         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
2587
2588         rec->priority_time = timeval_current();
2589
2590         /* register a message port for sending memory dumps */
2591         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2592
2593         /* register a message port for recovery elections */
2594         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2595
2596         /* when nodes are disabled/enabled */
2597         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2598
2599         /* when we are asked to puch out a flag change */
2600         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2601
2602         /* when nodes are banned */
2603         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
2604
2605         /* and one for when nodes are unbanned */
2606         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
2607
2608         /* register a message port for vacuum fetch */
2609         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2610
2611         /* register a message port for reloadnodes  */
2612         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2613
2614 again:
2615         if (mem_ctx) {
2616                 talloc_free(mem_ctx);
2617                 mem_ctx = NULL;
2618         }
2619         mem_ctx = talloc_new(ctdb);
2620         if (!mem_ctx) {
2621                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2622                 exit(-1);
2623         }
2624
2625         /* we only check for recovery once every second */
2626         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2627
2628         /* verify that the main daemon is still running */
2629         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2630                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2631                 exit(-1);
2632         }
2633
2634         /* ping the local daemon to tell it we are alive */
2635         ctdb_ctrl_recd_ping(ctdb);
2636
2637         if (rec->election_timeout) {
2638                 /* an election is in progress */
2639                 goto again;
2640         }
2641
2642         /* read the debug level from the parent and update locally */
2643         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2644         if (ret !=0) {
2645                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2646                 goto again;
2647         }
2648         LogLevel = debug_level;
2649
2650
2651         /* We must check if we need to ban a node here but we want to do this
2652            as early as possible so we dont wait until we have pulled the node
2653            map from the local node. thats why we have the hardcoded value 20
2654         */
2655         if (rec->culprit_counter > 20) {
2656                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u failures in %.0f seconds - banning it for %u seconds\n",
2657                          rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
2658                          ctdb->tunable.recovery_ban_period));
2659                 ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
2660         }
2661
2662         /* get relevant tunables */
2663         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2664         if (ret != 0) {
2665                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2666                 goto again;
2667         }
2668
2669         /* get the current recovery lock file from the server */
2670         if (update_recovery_lock_file(ctdb) != 0) {
2671                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2672                 goto again;
2673         }
2674
2675         /* Make sure that if recovery lock verification becomes disabled when
2676            we close the file
2677         */
2678         if (ctdb->tunable.verify_recovery_lock == 0) {
2679                 if (ctdb->recovery_lock_fd != -1) {
2680                         close(ctdb->recovery_lock_fd);
2681                         ctdb->recovery_lock_fd = -1;
2682                 }
2683         }
2684
2685         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2686         if (pnn == (uint32_t)-1) {
2687                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2688                 goto again;
2689         }
2690
2691         /* get the vnnmap */
2692         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2693         if (ret != 0) {
2694                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2695                 goto again;
2696         }
2697
2698
2699         /* get number of nodes */
2700         if (rec->nodemap) {
2701                 talloc_free(rec->nodemap);
2702                 rec->nodemap = NULL;
2703                 nodemap=NULL;
2704         }
2705         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2706         if (ret != 0) {
2707                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2708                 goto again;
2709         }
2710         nodemap = rec->nodemap;
2711
2712         /* check which node is the recovery master */
2713         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2714         if (ret != 0) {
2715                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2716                 goto again;
2717         }
2718
2719         if (rec->recmaster == (uint32_t)-1) {
2720                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2721                 force_election(rec, pnn, nodemap);
2722                 goto again;
2723         }
2724         
2725         /* check that we (recovery daemon) and the local ctdb daemon
2726            agrees on whether we are banned or not
2727         */
2728         if (nodemap->nodes[pnn].flags & NODE_FLAGS_BANNED) {
2729                 if (rec->banned_nodes[pnn] == NULL) {
2730                         if (rec->recmaster == pnn) {
2731                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster thinks this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2732
2733                                 ctdb_unban_node(rec, pnn);
2734                         } else {
2735                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster thinks this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2736                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2737                                 ctdb_set_culprit(rec, pnn);
2738                         }
2739                         goto again;
2740                 }
2741         } else {
2742                 if (rec->banned_nodes[pnn] != NULL) {
2743                         if (rec->recmaster == pnn) {
2744                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster does not think this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2745
2746                                 ctdb_unban_node(rec, pnn);
2747                         } else {
2748                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster does not think this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2749
2750                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2751                                 ctdb_set_culprit(rec, pnn);
2752                         }
2753                         goto again;
2754                 }
2755         }
2756
2757         /* remember our own node flags */
2758         rec->node_flags = nodemap->nodes[pnn].flags;
2759
2760         /* count how many active nodes there are */
2761         rec->num_active    = 0;
2762         rec->num_connected = 0;
2763         for (i=0; i<nodemap->num; i++) {
2764                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2765                         rec->num_active++;
2766                 }
2767                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2768                         rec->num_connected++;
2769                 }
2770         }
2771
2772
2773         /* verify that the recmaster node is still active */
2774         for (j=0; j<nodemap->num; j++) {
2775                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2776                         break;
2777                 }
2778         }
2779
2780         if (j == nodemap->num) {
2781                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2782                 force_election(rec, pnn, nodemap);
2783                 goto again;
2784         }
2785
2786         /* if recovery master is disconnected we must elect a new recmaster */
2787         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2788                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2789                 force_election(rec, pnn, nodemap);
2790                 goto again;
2791         }
2792
2793         /* grap the nodemap from the recovery master to check if it is banned */
2794         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2795                                    mem_ctx, &recmaster_nodemap);
2796         if (ret != 0) {
2797                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2798                           nodemap->nodes[j].pnn));
2799                 goto again;
2800         }
2801
2802
2803         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2804                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2805                 force_election(rec, pnn, nodemap);
2806                 goto again;
2807         }
2808
2809
2810         /* verify that we have all ip addresses we should have and we dont
2811          * have addresses we shouldnt have.
2812          */ 
2813         if (ctdb->do_checkpublicip) {
2814                 if (verify_ip_allocation(ctdb, pnn) != 0) {
2815                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
2816                         goto again;
2817                 }
2818         }
2819
2820
2821         /* if we are not the recmaster then we do not need to check
2822            if recovery is needed
2823          */
2824         if (pnn != rec->recmaster) {
2825                 goto again;
2826         }
2827
2828
2829         /* ensure our local copies of flags are right */
2830         ret = update_local_flags(rec, nodemap);
2831         if (ret == MONITOR_ELECTION_NEEDED) {
2832                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
2833                 force_election(rec, pnn, nodemap);
2834                 goto again;
2835         }
2836         if (ret != MONITOR_OK) {
2837                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2838                 goto again;
2839         }
2840
2841         /* update the list of public ips that a node can handle for
2842            all connected nodes
2843         */
2844         if (ctdb->num_nodes != nodemap->num) {
2845                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2846                 reload_nodes_file(ctdb);
2847                 goto again;
2848         }
2849         for (j=0; j<nodemap->num; j++) {
2850                 /* release any existing data */
2851                 if (ctdb->nodes[j]->public_ips) {
2852                         talloc_free(ctdb->nodes[j]->public_ips);
2853                         ctdb->nodes[j]->public_ips = NULL;
2854                 }
2855
2856                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2857                         continue;
2858                 }
2859
2860                 /* grab a new shiny list of public ips from the node */
2861                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
2862                         ctdb->nodes[j]->pnn, 
2863                         ctdb->nodes,
2864                         &ctdb->nodes[j]->public_ips)) {
2865                         DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
2866                                 ctdb->nodes[j]->pnn));
2867                         goto again;
2868                 }
2869         }
2870
2871
2872         /* verify that all active nodes agree that we are the recmaster */
2873         switch (verify_recmaster(rec, nodemap, pnn)) {
2874         case MONITOR_RECOVERY_NEEDED:
2875                 /* can not happen */
2876                 goto again;
2877         case MONITOR_ELECTION_NEEDED:
2878                 force_election(rec, pnn, nodemap);
2879                 goto again;
2880         case MONITOR_OK:
2881                 break;
2882         case MONITOR_FAILED:
2883                 goto again;
2884         }
2885
2886
2887         if (rec->need_recovery) {
2888                 /* a previous recovery didn't finish */
2889                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, -1);
2890                 goto again;             
2891         }
2892
2893         /* verify that all active nodes are in normal mode 
2894            and not in recovery mode 
2895          */
2896         switch (verify_recmode(ctdb, nodemap)) {
2897         case MONITOR_RECOVERY_NEEDED:
2898                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2899                 goto again;
2900         case MONITOR_FAILED:
2901                 goto again;
2902         case MONITOR_ELECTION_NEEDED:
2903                 /* can not happen */
2904         case MONITOR_OK:
2905                 break;
2906         }
2907
2908
2909         if (ctdb->tunable.verify_recovery_lock != 0) {
2910                 /* we should have the reclock - check its not stale */
2911                 ret = check_recovery_lock(ctdb);
2912                 if (ret != 0) {
2913                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
2914                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2915                         goto again;
2916                 }
2917         }
2918
2919         /* get the nodemap for all active remote nodes
2920          */
2921         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
2922         if (remote_nodemaps == NULL) {
2923                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
2924                 goto again;
2925         }
2926         for(i=0; i<nodemap->num; i++) {
2927                 remote_nodemaps[i] = NULL;
2928         }
2929         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
2930                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
2931                 goto again;
2932         } 
2933
2934         /* verify that all other nodes have the same nodemap as we have
2935         */
2936         for (j=0; j<nodemap->num; j++) {
2937                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2938                         continue;
2939                 }
2940
2941                 if (remote_nodemaps[j] == NULL) {
2942                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
2943                         ctdb_set_culprit(rec, j);
2944
2945                         goto again;
2946                 }
2947
2948                 /* if the nodes disagree on how many nodes there are
2949                    then this is a good reason to try recovery
2950                  */
2951                 if (remote_nodemaps[j]->num != nodemap->num) {
2952                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2953                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
2954                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2955                         goto again;
2956                 }
2957
2958                 /* if the nodes disagree on which nodes exist and are
2959                    active, then that is also a good reason to do recovery
2960                  */
2961                 for (i=0;i<nodemap->num;i++) {
2962                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
2963                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2964                                           nodemap->nodes[j].pnn, i, 
2965                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
2966                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2967                                             vnnmap, nodemap->nodes[j].pnn);
2968                                 goto again;
2969                         }
2970                 }
2971
2972                 /* verify the flags are consistent
2973                 */
2974                 for (i=0; i<nodemap->num; i++) {
2975                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2976                                 continue;
2977                         }
2978                         
2979                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
2980                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
2981                                   nodemap->nodes[j].pnn, 
2982                                   nodemap->nodes[i].pnn, 
2983                                   remote_nodemaps[j]->nodes[i].flags,
2984                                   nodemap->nodes[j].flags));
2985                                 if (i == j) {
2986                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
2987                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
2988                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2989                                                     vnnmap, nodemap->nodes[j].pnn);
2990                                         goto again;
2991                                 } else {
2992                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
2993                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
2994                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2995                                                     vnnmap, nodemap->nodes[j].pnn);
2996                                         goto again;
2997                                 }
2998                         }
2999                 }
3000         }
3001
3002
3003         /* there better be the same number of lmasters in the vnn map
3004            as there are active nodes or we will have to do a recovery
3005          */
3006         if (vnnmap->size != rec->num_active) {
3007                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3008                           vnnmap->size, rec->num_active));
3009                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
3010                 goto again;
3011         }
3012
3013         /* verify that all active nodes in the nodemap also exist in 
3014            the vnnmap.
3015          */
3016         for (j=0; j<nodemap->num; j++) {
3017                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3018                         continue;
3019                 }
3020                 if (nodemap->nodes[j].pnn == pnn) {
3021                         continue;
3022                 }
3023
3024                 for (i=0; i<vnnmap->size; i++) {
3025                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3026                                 break;
3027                         }
3028                 }
3029                 if (i == vnnmap->size) {
3030                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3031                                   nodemap->nodes[j].pnn));
3032                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
3033                         goto again;
3034                 }
3035         }
3036
3037         
3038         /* verify that all other nodes have the same vnnmap
3039            and are from the same generation
3040          */
3041         for (j=0; j<nodemap->num; j++) {
3042                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3043                         continue;
3044                 }
3045                 if (nodemap->nodes[j].pnn == pnn) {
3046                         continue;
3047                 }
3048
3049                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3050                                           mem_ctx, &remote_vnnmap);
3051                 if (ret != 0) {
3052                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3053                                   nodemap->nodes[j].pnn));
3054                         goto again;
3055                 }
3056
3057                 /* verify the vnnmap generation is the same */
3058                 if (vnnmap->generation != remote_vnnmap->generation) {
3059                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3060                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3061                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
3062                         goto again;
3063                 }
3064
3065                 /* verify the vnnmap size is the same */
3066                 if (vnnmap->size != remote_vnnmap->size) {
3067                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3068                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3069                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
3070                         goto again;
3071                 }
3072
3073                 /* verify the vnnmap is the same */
3074                 for (i=0;i<vnnmap->size;i++) {
3075                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3076                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3077                                           nodemap->nodes[j].pnn));
3078                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3079                                             vnnmap, nodemap->nodes[j].pnn);
3080                                 goto again;
3081                         }
3082                 }
3083         }
3084
3085         /* we might need to change who has what IP assigned */
3086         if (rec->need_takeover_run) {
3087                 rec->need_takeover_run = false;
3088
3089                 /* execute the "startrecovery" event script on all nodes */
3090                 ret = run_startrecovery_eventscript(rec, nodemap);
3091                 if (ret!=0) {
3092                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3093                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3094                                     vnnmap, ctdb->pnn);
3095                 }
3096
3097                 ret = ctdb_takeover_run(ctdb, nodemap);
3098                 if (ret != 0) {
3099                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3100                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3101                                     vnnmap, ctdb->pnn);
3102                 }
3103
3104                 /* execute the "recovered" event script on all nodes */
3105                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3106 #if 0
3107 // we cant check whether the event completed successfully
3108 // since this script WILL fail if the node is in recovery mode
3109 // and if that race happens, the code here would just cause a second
3110 // cascading recovery.
3111                 if (ret!=0) {
3112                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3113                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3114                                     vnnmap, ctdb->pnn);
3115                 }
3116 #endif
3117         }
3118
3119
3120         goto again;
3121
3122 }
3123
3124 /*
3125   event handler for when the main ctdbd dies
3126  */
3127 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3128                                  uint16_t flags, void *private_data)
3129 {
3130         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3131         _exit(1);
3132 }
3133
3134 /*
3135   called regularly to verify that the recovery daemon is still running
3136  */
3137 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3138                               struct timeval yt, void *p)
3139 {
3140         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3141
3142         if (kill(ctdb->recoverd_pid, 0) != 0) {
3143                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3144
3145                 ctdb_stop_recoverd(ctdb);
3146                 ctdb_stop_keepalive(ctdb);
3147                 ctdb_stop_monitoring(ctdb);
3148                 ctdb_release_all_ips(ctdb);
3149                 if (ctdb->methods != NULL) {
3150                         ctdb->methods->shutdown(ctdb);
3151                 }
3152                 ctdb_event_script(ctdb, "shutdown");
3153
3154                 exit(10);       
3155         }
3156
3157         event_add_timed(ctdb->ev, ctdb, 
3158                         timeval_current_ofs(30, 0),
3159                         ctdb_check_recd, ctdb);
3160 }
3161
3162 static void recd_sig_child_handler(struct event_context *ev,
3163         struct signal_event *se, int signum, int count,
3164         void *dont_care, 
3165         void *private_data)
3166 {
3167 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3168         int status;
3169         pid_t pid = -1;
3170
3171         while (pid != 0) {
3172                 pid = waitpid(-1, &status, WNOHANG);
3173                 if (pid == -1) {
3174                         if (errno != ECHILD) {
3175                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3176                         }
3177                         return;
3178                 }
3179                 if (pid > 0) {
3180                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3181                 }
3182         }
3183 }
3184
3185 /*
3186   startup the recovery daemon as a child of the main ctdb daemon
3187  */
3188 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3189 {
3190         int fd[2];
3191         struct signal_event *se;
3192
3193         if (pipe(fd) != 0) {
3194                 return -1;
3195         }
3196
3197         ctdb->ctdbd_pid = getpid();
3198
3199         ctdb->recoverd_pid = fork();
3200         if (ctdb->recoverd_pid == -1) {
3201                 return -1;
3202         }
3203         
3204         if (ctdb->recoverd_pid != 0) {
3205                 close(fd[0]);
3206                 event_add_timed(ctdb->ev, ctdb, 
3207                                 timeval_current_ofs(30, 0),
3208                                 ctdb_check_recd, ctdb);
3209                 return 0;
3210         }
3211
3212         close(fd[1]);
3213
3214         srandom(getpid() ^ time(NULL));
3215
3216         if (switch_from_server_to_client(ctdb) != 0) {
3217                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3218                 exit(1);
3219         }
3220
3221         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3222                      ctdb_recoverd_parent, &fd[0]);     
3223
3224         /* set up a handler to pick up sigchld */
3225         se = event_add_signal(ctdb->ev, ctdb,
3226                                      SIGCHLD, 0,
3227                                      recd_sig_child_handler,
3228                                      ctdb);
3229         if (se == NULL) {
3230                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3231                 exit(1);
3232         }
3233
3234         monitor_cluster(ctdb);
3235
3236         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3237         return -1;
3238 }
3239
3240 /*
3241   shutdown the recovery daemon
3242  */
3243 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3244 {
3245         if (ctdb->recoverd_pid == 0) {
3246                 return;
3247         }
3248
3249         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3250         kill(ctdb->recoverd_pid, SIGTERM);
3251 }