add a new tunable : reclockpingperiod
[sahlberg/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 struct ban_state {
35         struct ctdb_recoverd *rec;
36         uint32_t banned_node;
37 };
38
39 /*
40   private state of recovery daemon
41  */
42 struct ctdb_recoverd {
43         struct ctdb_context *ctdb;
44         int rec_file_fd;
45         uint32_t recmaster;
46         uint32_t num_active;
47         struct ctdb_node_map *nodemap;
48         uint32_t last_culprit;
49         uint32_t culprit_counter;
50         struct timeval first_recover_time;
51         struct ban_state **banned_nodes;
52         struct timeval priority_time;
53         bool need_takeover_run;
54         bool need_recovery;
55         uint32_t node_flags;
56         struct timed_event *send_election_te;
57         struct timed_event *election_timeout;
58         struct vacuum_info *vacuum_info;
59 };
60
61 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
62 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
63
64
65 /*
66   unban a node
67  */
68 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
69 {
70         struct ctdb_context *ctdb = rec->ctdb;
71
72         DEBUG(DEBUG_NOTICE,("Unbanning node %u\n", pnn));
73
74         if (!ctdb_validate_pnn(ctdb, pnn)) {
75                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_unban_node\n", pnn));
76                 return;
77         }
78
79         /* If we are unbanning a different node then just pass the ban info on */
80         if (pnn != ctdb->pnn) {
81                 TDB_DATA data;
82                 int ret;
83                 
84                 DEBUG(DEBUG_NOTICE,("Unanning remote node %u. Passing the ban request on to the remote node.\n", pnn));
85
86                 data.dptr = (uint8_t *)&pnn;
87                 data.dsize = sizeof(uint32_t);
88
89                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_UNBAN_NODE, data);
90                 if (ret != 0) {
91                         DEBUG(DEBUG_ERR,("Failed to unban node %u\n", pnn));
92                         return;
93                 }
94
95                 return;
96         }
97
98         /* make sure we remember we are no longer banned in case 
99            there is an election */
100         rec->node_flags &= ~NODE_FLAGS_BANNED;
101
102         DEBUG(DEBUG_INFO,("Clearing ban flag on node %u\n", pnn));
103         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
104
105         if (rec->banned_nodes[pnn] == NULL) {
106                 DEBUG(DEBUG_INFO,("No ban recorded for this node. ctdb_unban_node() request ignored\n"));
107                 return;
108         }
109
110         talloc_free(rec->banned_nodes[pnn]);
111         rec->banned_nodes[pnn] = NULL;
112 }
113
114
115 /*
116   called when a ban has timed out
117  */
118 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
119 {
120         struct ban_state *state = talloc_get_type(p, struct ban_state);
121         struct ctdb_recoverd *rec = state->rec;
122         uint32_t pnn = state->banned_node;
123
124         DEBUG(DEBUG_NOTICE,("Ban timeout. Node %u is now unbanned\n", pnn));
125         ctdb_unban_node(rec, pnn);
126 }
127
128 /*
129   ban a node for a period of time
130  */
131 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
132 {
133         struct ctdb_context *ctdb = rec->ctdb;
134
135         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
136
137         if (!ctdb_validate_pnn(ctdb, pnn)) {
138                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
139                 return;
140         }
141
142         if (0 == ctdb->tunable.enable_bans) {
143                 DEBUG(DEBUG_INFO,("Bans are disabled - ignoring ban of node %u\n", pnn));
144                 return;
145         }
146
147         /* If we are banning a different node then just pass the ban info on */
148         if (pnn != ctdb->pnn) {
149                 struct ctdb_ban_info b;
150                 TDB_DATA data;
151                 int ret;
152                 
153                 DEBUG(DEBUG_NOTICE,("Banning remote node %u for %u seconds. Passing the ban request on to the remote node.\n", pnn, ban_time));
154
155                 b.pnn = pnn;
156                 b.ban_time = ban_time;
157
158                 data.dptr = (uint8_t *)&b;
159                 data.dsize = sizeof(b);
160
161                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_BAN_NODE, data);
162                 if (ret != 0) {
163                         DEBUG(DEBUG_ERR,("Failed to ban node %u\n", pnn));
164                         return;
165                 }
166
167                 return;
168         }
169
170         DEBUG(DEBUG_NOTICE,("self ban - lowering our election priority\n"));
171         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
172
173         /* banning ourselves - lower our election priority */
174         rec->priority_time = timeval_current();
175
176         /* make sure we remember we are banned in case there is an 
177            election */
178         rec->node_flags |= NODE_FLAGS_BANNED;
179
180         if (rec->banned_nodes[pnn] != NULL) {
181                 DEBUG(DEBUG_NOTICE,("Re-banning an already banned node. Remove previous ban and set a new ban.\n"));            
182                 talloc_free(rec->banned_nodes[pnn]);
183                 rec->banned_nodes[pnn] = NULL;
184         }
185
186         rec->banned_nodes[pnn] = talloc(rec->banned_nodes, struct ban_state);
187         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
188
189         rec->banned_nodes[pnn]->rec = rec;
190         rec->banned_nodes[pnn]->banned_node = pnn;
191
192         if (ban_time != 0) {
193                 event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
194                                 timeval_current_ofs(ban_time, 0),
195                                 ctdb_ban_timeout, rec->banned_nodes[pnn]);
196         }
197 }
198
199 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
200
201
202 /*
203   run the "recovered" eventscript on all nodes
204  */
205 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
206 {
207         TALLOC_CTX *tmp_ctx;
208
209         tmp_ctx = talloc_new(ctdb);
210         CTDB_NO_MEMORY(ctdb, tmp_ctx);
211
212         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
213                         list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
214                         CONTROL_TIMEOUT(), false, tdb_null) != 0) {
215                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event. Recovery failed.\n"));
216                 talloc_free(tmp_ctx);
217                 return -1;
218         }
219
220         talloc_free(tmp_ctx);
221         return 0;
222 }
223
224 /*
225   run the "startrecovery" eventscript on all nodes
226  */
227 static int run_startrecovery_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
228 {
229         TALLOC_CTX *tmp_ctx;
230
231         tmp_ctx = talloc_new(ctdb);
232         CTDB_NO_MEMORY(ctdb, tmp_ctx);
233
234         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
235                         list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
236                         CONTROL_TIMEOUT(), false, tdb_null) != 0) {
237                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
238                 talloc_free(tmp_ctx);
239                 return -1;
240         }
241
242         talloc_free(tmp_ctx);
243         return 0;
244 }
245
246 /*
247   change recovery mode on all nodes
248  */
249 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
250 {
251         TDB_DATA data;
252         uint32_t *nodes;
253         TALLOC_CTX *tmp_ctx;
254
255         tmp_ctx = talloc_new(ctdb);
256         CTDB_NO_MEMORY(ctdb, tmp_ctx);
257
258         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
259
260         /* freeze all nodes */
261         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
262                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
263                                                 nodes, CONTROL_TIMEOUT(),
264                                                 false, tdb_null) != 0) {
265                         DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
266                         talloc_free(tmp_ctx);
267                         return -1;
268                 }
269         }
270
271
272         data.dsize = sizeof(uint32_t);
273         data.dptr = (unsigned char *)&rec_mode;
274
275         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
276                                         nodes, CONTROL_TIMEOUT(),
277                                         false, data) != 0) {
278                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
279                 talloc_free(tmp_ctx);
280                 return -1;
281         }
282
283         if (rec_mode == CTDB_RECOVERY_NORMAL) {
284                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
285                                                 nodes, CONTROL_TIMEOUT(),
286                                                 false, tdb_null) != 0) {
287                         DEBUG(DEBUG_ERR, (__location__ " Unable to thaw nodes. Recovery failed.\n"));
288                         talloc_free(tmp_ctx);
289                         return -1;
290                 }
291         }
292
293         talloc_free(tmp_ctx);
294         return 0;
295 }
296
297 /*
298   change recovery master on all node
299  */
300 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
301 {
302         TDB_DATA data;
303         TALLOC_CTX *tmp_ctx;
304
305         tmp_ctx = talloc_new(ctdb);
306         CTDB_NO_MEMORY(ctdb, tmp_ctx);
307
308         data.dsize = sizeof(uint32_t);
309         data.dptr = (unsigned char *)&pnn;
310
311         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
312                         list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
313                         CONTROL_TIMEOUT(), false, data) != 0) {
314                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
315                 talloc_free(tmp_ctx);
316                 return -1;
317         }
318
319         talloc_free(tmp_ctx);
320         return 0;
321 }
322
323
324 /*
325   ensure all other nodes have attached to any databases that we have
326  */
327 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
328                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
329 {
330         int i, j, db, ret;
331         struct ctdb_dbid_map *remote_dbmap;
332
333         /* verify that all other nodes have all our databases */
334         for (j=0; j<nodemap->num; j++) {
335                 /* we dont need to ourself ourselves */
336                 if (nodemap->nodes[j].pnn == pnn) {
337                         continue;
338                 }
339                 /* dont check nodes that are unavailable */
340                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
341                         continue;
342                 }
343
344                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
345                                          mem_ctx, &remote_dbmap);
346                 if (ret != 0) {
347                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
348                         return -1;
349                 }
350
351                 /* step through all local databases */
352                 for (db=0; db<dbmap->num;db++) {
353                         const char *name;
354
355
356                         for (i=0;i<remote_dbmap->num;i++) {
357                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
358                                         break;
359                                 }
360                         }
361                         /* the remote node already have this database */
362                         if (i!=remote_dbmap->num) {
363                                 continue;
364                         }
365                         /* ok so we need to create this database */
366                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
367                                             mem_ctx, &name);
368                         if (ret != 0) {
369                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
370                                 return -1;
371                         }
372                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
373                                            mem_ctx, name, dbmap->dbs[db].persistent);
374                         if (ret != 0) {
375                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
376                                 return -1;
377                         }
378                 }
379         }
380
381         return 0;
382 }
383
384
385 /*
386   ensure we are attached to any databases that anyone else is attached to
387  */
388 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
389                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
390 {
391         int i, j, db, ret;
392         struct ctdb_dbid_map *remote_dbmap;
393
394         /* verify that we have all database any other node has */
395         for (j=0; j<nodemap->num; j++) {
396                 /* we dont need to ourself ourselves */
397                 if (nodemap->nodes[j].pnn == pnn) {
398                         continue;
399                 }
400                 /* dont check nodes that are unavailable */
401                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
402                         continue;
403                 }
404
405                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
406                                          mem_ctx, &remote_dbmap);
407                 if (ret != 0) {
408                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
409                         return -1;
410                 }
411
412                 /* step through all databases on the remote node */
413                 for (db=0; db<remote_dbmap->num;db++) {
414                         const char *name;
415
416                         for (i=0;i<(*dbmap)->num;i++) {
417                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
418                                         break;
419                                 }
420                         }
421                         /* we already have this db locally */
422                         if (i!=(*dbmap)->num) {
423                                 continue;
424                         }
425                         /* ok so we need to create this database and
426                            rebuild dbmap
427                          */
428                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
429                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
430                         if (ret != 0) {
431                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
432                                           nodemap->nodes[j].pnn));
433                                 return -1;
434                         }
435                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
436                                            remote_dbmap->dbs[db].persistent);
437                         if (ret != 0) {
438                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
439                                 return -1;
440                         }
441                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
442                         if (ret != 0) {
443                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
444                                 return -1;
445                         }
446                 }
447         }
448
449         return 0;
450 }
451
452
453 /*
454   pull the remote database contents from one node into the recdb
455  */
456 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
457                                     struct tdb_wrap *recdb, uint32_t dbid)
458 {
459         int ret;
460         TDB_DATA outdata;
461         struct ctdb_control_pulldb_reply *reply;
462         struct ctdb_rec_data *rec;
463         int i;
464         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
465
466         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
467                                CONTROL_TIMEOUT(), &outdata);
468         if (ret != 0) {
469                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
470                 talloc_free(tmp_ctx);
471                 return -1;
472         }
473
474         reply = (struct ctdb_control_pulldb_reply *)outdata.dptr;
475
476         if (outdata.dsize < offsetof(struct ctdb_control_pulldb_reply, data)) {
477                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
478                 talloc_free(tmp_ctx);
479                 return -1;
480         }
481         
482         rec = (struct ctdb_rec_data *)&reply->data[0];
483         
484         for (i=0;
485              i<reply->count;
486              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
487                 TDB_DATA key, data;
488                 struct ctdb_ltdb_header *hdr;
489                 TDB_DATA existing;
490                 
491                 key.dptr = &rec->data[0];
492                 key.dsize = rec->keylen;
493                 data.dptr = &rec->data[key.dsize];
494                 data.dsize = rec->datalen;
495                 
496                 hdr = (struct ctdb_ltdb_header *)data.dptr;
497
498                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
499                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
500                         talloc_free(tmp_ctx);
501                         return -1;
502                 }
503
504                 /* fetch the existing record, if any */
505                 existing = tdb_fetch(recdb->tdb, key);
506                 
507                 if (existing.dptr != NULL) {
508                         struct ctdb_ltdb_header header;
509                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
510                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
511                                          (unsigned)existing.dsize, srcnode));
512                                 free(existing.dptr);
513                                 talloc_free(tmp_ctx);
514                                 return -1;
515                         }
516                         header = *(struct ctdb_ltdb_header *)existing.dptr;
517                         free(existing.dptr);
518                         if (!(header.rsn < hdr->rsn ||
519                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
520                                 continue;
521                         }
522                 }
523                 
524                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
525                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
526                         talloc_free(tmp_ctx);
527                         return -1;                              
528                 }
529         }
530
531         talloc_free(tmp_ctx);
532
533         return 0;
534 }
535
536 /*
537   pull all the remote database contents into the recdb
538  */
539 static int pull_remote_database(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
540                                 struct tdb_wrap *recdb, uint32_t dbid)
541 {
542         int j;
543
544         /* pull all records from all other nodes across onto this node
545            (this merges based on rsn)
546         */
547         for (j=0; j<nodemap->num; j++) {
548                 /* dont merge from nodes that are unavailable */
549                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
550                         continue;
551                 }
552                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
553                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
554                                  nodemap->nodes[j].pnn));
555                         return -1;
556                 }
557         }
558         
559         return 0;
560 }
561
562
563 /*
564   update flags on all active nodes
565  */
566 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
567 {
568         int i;
569         for (i=0;i<nodemap->num;i++) {
570                 struct ctdb_node_flag_change c;
571                 TDB_DATA data;
572
573                 c.pnn = nodemap->nodes[i].pnn;
574                 c.old_flags = nodemap->nodes[i].flags;
575                 c.new_flags = nodemap->nodes[i].flags;
576
577                 data.dptr = (uint8_t *)&c;
578                 data.dsize = sizeof(c);
579
580                 ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
581                                   CTDB_SRVID_NODE_FLAGS_CHANGED, data);
582
583         }
584         return 0;
585 }
586
587
588 /*
589   ensure all nodes have the same vnnmap we do
590  */
591 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
592                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
593 {
594         int j, ret;
595
596         /* push the new vnn map out to all the nodes */
597         for (j=0; j<nodemap->num; j++) {
598                 /* dont push to nodes that are unavailable */
599                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
600                         continue;
601                 }
602
603                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
604                 if (ret != 0) {
605                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
606                         return -1;
607                 }
608         }
609
610         return 0;
611 }
612
613
614 /*
615   handler for when the admin bans a node
616 */
617 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
618                         TDB_DATA data, void *private_data)
619 {
620         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
621         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
622         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
623
624         if (data.dsize != sizeof(*b)) {
625                 DEBUG(DEBUG_ERR,("Bad data in ban_handler\n"));
626                 talloc_free(mem_ctx);
627                 return;
628         }
629
630         if (b->pnn != ctdb->pnn) {
631                 DEBUG(DEBUG_ERR,("Got a ban request for pnn:%u but our pnn is %u. Ignoring ban request\n", b->pnn, ctdb->pnn));
632                 return;
633         }
634
635         DEBUG(DEBUG_NOTICE,("Node %u has been banned for %u seconds\n", 
636                  b->pnn, b->ban_time));
637
638         ctdb_ban_node(rec, b->pnn, b->ban_time);
639         talloc_free(mem_ctx);
640 }
641
642 /*
643   handler for when the admin unbans a node
644 */
645 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
646                           TDB_DATA data, void *private_data)
647 {
648         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
649         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
650         uint32_t pnn;
651
652         if (data.dsize != sizeof(uint32_t)) {
653                 DEBUG(DEBUG_ERR,("Bad data in unban_handler\n"));
654                 talloc_free(mem_ctx);
655                 return;
656         }
657         pnn = *(uint32_t *)data.dptr;
658
659         if (pnn != ctdb->pnn) {
660                 DEBUG(DEBUG_ERR,("Got an unban request for pnn:%u but our pnn is %u. Ignoring unban request\n", pnn, ctdb->pnn));
661                 return;
662         }
663
664         DEBUG(DEBUG_NOTICE,("Node %u has been unbanned.\n", pnn));
665         ctdb_unban_node(rec, pnn);
666         talloc_free(mem_ctx);
667 }
668
669
670 struct vacuum_info {
671         struct vacuum_info *next, *prev;
672         struct ctdb_recoverd *rec;
673         uint32_t srcnode;
674         struct ctdb_db_context *ctdb_db;
675         struct ctdb_control_pulldb_reply *recs;
676         struct ctdb_rec_data *r;
677 };
678
679 static void vacuum_fetch_next(struct vacuum_info *v);
680
681 /*
682   called when a vacuum fetch has completed - just free it and do the next one
683  */
684 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
685 {
686         struct vacuum_info *v = talloc_get_type(state->async.private, struct vacuum_info);
687         talloc_free(state);
688         vacuum_fetch_next(v);
689 }
690
691
692 /*
693   process the next element from the vacuum list
694 */
695 static void vacuum_fetch_next(struct vacuum_info *v)
696 {
697         struct ctdb_call call;
698         struct ctdb_rec_data *r;
699
700         while (v->recs->count) {
701                 struct ctdb_client_call_state *state;
702                 TDB_DATA data;
703                 struct ctdb_ltdb_header *hdr;
704
705                 ZERO_STRUCT(call);
706                 call.call_id = CTDB_NULL_FUNC;
707                 call.flags = CTDB_IMMEDIATE_MIGRATION;
708
709                 r = v->r;
710                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
711                 v->recs->count--;
712
713                 call.key.dptr = &r->data[0];
714                 call.key.dsize = r->keylen;
715
716                 /* ensure we don't block this daemon - just skip a record if we can't get
717                    the chainlock */
718                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
719                         continue;
720                 }
721
722                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
723                 if (data.dptr == NULL) {
724                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
725                         continue;
726                 }
727
728                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
729                         free(data.dptr);
730                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
731                         continue;
732                 }
733                 
734                 hdr = (struct ctdb_ltdb_header *)data.dptr;
735                 if (hdr->dmaster == v->rec->ctdb->pnn) {
736                         /* its already local */
737                         free(data.dptr);
738                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
739                         continue;
740                 }
741
742                 free(data.dptr);
743
744                 state = ctdb_call_send(v->ctdb_db, &call);
745                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
746                 if (state == NULL) {
747                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
748                         talloc_free(v);
749                         return;
750                 }
751                 state->async.fn = vacuum_fetch_callback;
752                 state->async.private = v;
753                 return;
754         }
755
756         talloc_free(v);
757 }
758
759
760 /*
761   destroy a vacuum info structure
762  */
763 static int vacuum_info_destructor(struct vacuum_info *v)
764 {
765         DLIST_REMOVE(v->rec->vacuum_info, v);
766         return 0;
767 }
768
769
770 /*
771   handler for vacuum fetch
772 */
773 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
774                                  TDB_DATA data, void *private_data)
775 {
776         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
777         struct ctdb_control_pulldb_reply *recs;
778         int ret, i;
779         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
780         const char *name;
781         struct ctdb_dbid_map *dbmap=NULL;
782         bool persistent = false;
783         struct ctdb_db_context *ctdb_db;
784         struct ctdb_rec_data *r;
785         uint32_t srcnode;
786         struct vacuum_info *v;
787
788         recs = (struct ctdb_control_pulldb_reply *)data.dptr;
789         r = (struct ctdb_rec_data *)&recs->data[0];
790
791         if (recs->count == 0) {
792                 return;
793         }
794
795         srcnode = r->reqid;
796
797         for (v=rec->vacuum_info;v;v=v->next) {
798                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
799                         /* we're already working on records from this node */
800                         return;
801                 }
802         }
803
804         /* work out if the database is persistent */
805         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
806         if (ret != 0) {
807                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
808                 talloc_free(tmp_ctx);
809                 return;
810         }
811
812         for (i=0;i<dbmap->num;i++) {
813                 if (dbmap->dbs[i].dbid == recs->db_id) {
814                         persistent = dbmap->dbs[i].persistent;
815                         break;
816                 }
817         }
818         if (i == dbmap->num) {
819                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
820                 talloc_free(tmp_ctx);
821                 return;         
822         }
823
824         /* find the name of this database */
825         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
826                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
827                 talloc_free(tmp_ctx);
828                 return;
829         }
830
831         /* attach to it */
832         ctdb_db = ctdb_attach(ctdb, name, persistent);
833         if (ctdb_db == NULL) {
834                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
835                 talloc_free(tmp_ctx);
836                 return;
837         }
838
839         v = talloc_zero(rec, struct vacuum_info);
840         if (v == NULL) {
841                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
842                 return;
843         }
844
845         v->rec = rec;
846         v->srcnode = srcnode;
847         v->ctdb_db = ctdb_db;
848         v->recs = talloc_memdup(v, recs, data.dsize);
849         if (v->recs == NULL) {
850                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
851                 talloc_free(v);
852                 return;         
853         }
854         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
855
856         DLIST_ADD(rec->vacuum_info, v);
857
858         talloc_set_destructor(v, vacuum_info_destructor);
859
860         vacuum_fetch_next(v);
861 }
862
863
864 /*
865   called when ctdb_wait_timeout should finish
866  */
867 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
868                               struct timeval yt, void *p)
869 {
870         uint32_t *timed_out = (uint32_t *)p;
871         (*timed_out) = 1;
872 }
873
874 /*
875   wait for a given number of seconds
876  */
877 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
878 {
879         uint32_t timed_out = 0;
880         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
881         while (!timed_out) {
882                 event_loop_once(ctdb->ev);
883         }
884 }
885
886 /*
887   called when an election times out (ends)
888  */
889 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
890                                   struct timeval t, void *p)
891 {
892         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
893         rec->election_timeout = NULL;
894 }
895
896
897 /*
898   wait for an election to finish. It finished election_timeout seconds after
899   the last election packet is received
900  */
901 static void ctdb_wait_election(struct ctdb_recoverd *rec)
902 {
903         struct ctdb_context *ctdb = rec->ctdb;
904         while (rec->election_timeout) {
905                 event_loop_once(ctdb->ev);
906         }
907 }
908
909 /*
910   remember the trouble maker
911  */
912 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
913 {
914         struct ctdb_context *ctdb = rec->ctdb;
915
916         if (rec->last_culprit != culprit ||
917             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
918                 DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
919                 /* either a new node is the culprit, or we've decided to forgive them */
920                 rec->last_culprit = culprit;
921                 rec->first_recover_time = timeval_current();
922                 rec->culprit_counter = 0;
923         }
924         rec->culprit_counter++;
925 }
926
927 /*
928   Update our local flags from all remote connected nodes. 
929   This is only run when we are or we belive we are the recovery master
930  */
931 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
932 {
933         int j;
934         struct ctdb_context *ctdb = rec->ctdb;
935         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
936
937         /* get the nodemap for all active remote nodes and verify
938            they are the same as for this node
939          */
940         for (j=0; j<nodemap->num; j++) {
941                 struct ctdb_node_map *remote_nodemap=NULL;
942                 int ret;
943
944                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
945                         continue;
946                 }
947                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
948                         continue;
949                 }
950
951                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
952                                            mem_ctx, &remote_nodemap);
953                 if (ret != 0) {
954                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
955                                   nodemap->nodes[j].pnn));
956                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
957                         talloc_free(mem_ctx);
958                         return MONITOR_FAILED;
959                 }
960                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
961                         struct ctdb_node_flag_change c;
962                         TDB_DATA data;
963
964                         /* We should tell our daemon about this so it
965                            updates its flags or else we will log the same 
966                            message again in the next iteration of recovery.
967                            Since we are the recovery master we can just as
968                            well update the flags on all nodes.
969                         */
970                         c.pnn = nodemap->nodes[j].pnn;
971                         c.old_flags = nodemap->nodes[j].flags;
972                         c.new_flags = remote_nodemap->nodes[j].flags;
973
974                         data.dptr = (uint8_t *)&c;
975                         data.dsize = sizeof(c);
976
977                         ctdb_send_message(ctdb, ctdb->pnn,
978                                         CTDB_SRVID_NODE_FLAGS_CHANGED, 
979                                         data);
980
981                         /* Update our local copy of the flags in the recovery
982                            daemon.
983                         */
984                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
985                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
986                                  nodemap->nodes[j].flags));
987                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
988
989                         /* If the BANNED flag has changed for the node
990                            this is a good reason to do a new election.
991                          */
992                         if ((c.old_flags ^ c.new_flags) & NODE_FLAGS_BANNED) {
993                                 DEBUG(DEBUG_NOTICE,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
994                                  nodemap->nodes[j].pnn, c.new_flags,
995                                  c.old_flags));
996                                 talloc_free(mem_ctx);
997                                 return MONITOR_ELECTION_NEEDED;
998                         }
999
1000                 }
1001                 talloc_free(remote_nodemap);
1002         }
1003         talloc_free(mem_ctx);
1004         return MONITOR_OK;
1005 }
1006
1007
1008 /* Create a new random generation ip. 
1009    The generation id can not be the INVALID_GENERATION id
1010 */
1011 static uint32_t new_generation(void)
1012 {
1013         uint32_t generation;
1014
1015         while (1) {
1016                 generation = random();
1017
1018                 if (generation != INVALID_GENERATION) {
1019                         break;
1020                 }
1021         }
1022
1023         return generation;
1024 }
1025
1026
1027 /*
1028   create a temporary working database
1029  */
1030 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1031 {
1032         char *name;
1033         struct tdb_wrap *recdb;
1034
1035         /* open up the temporary recovery database */
1036         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
1037         if (name == NULL) {
1038                 return NULL;
1039         }
1040         unlink(name);
1041         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1042                               TDB_NOLOCK, O_RDWR|O_CREAT|O_EXCL, 0600);
1043         if (recdb == NULL) {
1044                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1045         }
1046
1047         talloc_free(name);
1048
1049         return recdb;
1050 }
1051
1052
1053 /* 
1054    a traverse function for pulling all relevent records from recdb
1055  */
1056 struct recdb_data {
1057         struct ctdb_context *ctdb;
1058         struct ctdb_control_pulldb_reply *recdata;
1059         uint32_t len;
1060         bool failed;
1061 };
1062
1063 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1064 {
1065         struct recdb_data *params = (struct recdb_data *)p;
1066         struct ctdb_rec_data *rec;
1067         struct ctdb_ltdb_header *hdr;
1068
1069         /* skip empty records */
1070         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1071                 return 0;
1072         }
1073
1074         /* update the dmaster field to point to us */
1075         hdr = (struct ctdb_ltdb_header *)data.dptr;
1076         hdr->dmaster = params->ctdb->pnn;
1077
1078         /* add the record to the blob ready to send to the nodes */
1079         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1080         if (rec == NULL) {
1081                 params->failed = true;
1082                 return -1;
1083         }
1084         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1085         if (params->recdata == NULL) {
1086                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1087                          rec->length + params->len, params->recdata->count));
1088                 params->failed = true;
1089                 return -1;
1090         }
1091         params->recdata->count++;
1092         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1093         params->len += rec->length;
1094         talloc_free(rec);
1095
1096         return 0;
1097 }
1098
1099 /*
1100   push the recdb database out to all nodes
1101  */
1102 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1103                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1104 {
1105         struct recdb_data params;
1106         struct ctdb_control_pulldb_reply *recdata;
1107         TDB_DATA outdata;
1108         TALLOC_CTX *tmp_ctx;
1109
1110         tmp_ctx = talloc_new(ctdb);
1111         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1112
1113         recdata = talloc_zero(recdb, struct ctdb_control_pulldb_reply);
1114         CTDB_NO_MEMORY(ctdb, recdata);
1115
1116         recdata->db_id = dbid;
1117
1118         params.ctdb = ctdb;
1119         params.recdata = recdata;
1120         params.len = offsetof(struct ctdb_control_pulldb_reply, data);
1121         params.failed = false;
1122
1123         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1124                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1125                 talloc_free(params.recdata);
1126                 talloc_free(tmp_ctx);
1127                 return -1;
1128         }
1129
1130         if (params.failed) {
1131                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1132                 talloc_free(params.recdata);
1133                 talloc_free(tmp_ctx);
1134                 return -1;              
1135         }
1136
1137         recdata = params.recdata;
1138
1139         outdata.dptr = (void *)recdata;
1140         outdata.dsize = params.len;
1141
1142         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1143                         list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
1144                         CONTROL_TIMEOUT(), false, outdata) != 0) {
1145                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1146                 talloc_free(recdata);
1147                 talloc_free(tmp_ctx);
1148                 return -1;
1149         }
1150
1151         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1152                   dbid, recdata->count));
1153
1154         talloc_free(recdata);
1155         talloc_free(tmp_ctx);
1156
1157         return 0;
1158 }
1159
1160
1161 /*
1162   go through a full recovery on one database 
1163  */
1164 static int recover_database(struct ctdb_recoverd *rec, 
1165                             TALLOC_CTX *mem_ctx,
1166                             uint32_t dbid,
1167                             uint32_t pnn, 
1168                             struct ctdb_node_map *nodemap,
1169                             uint32_t transaction_id)
1170 {
1171         struct tdb_wrap *recdb;
1172         int ret;
1173         struct ctdb_context *ctdb = rec->ctdb;
1174         TDB_DATA data;
1175         struct ctdb_control_wipe_database w;
1176
1177         recdb = create_recdb(ctdb, mem_ctx);
1178         if (recdb == NULL) {
1179                 return -1;
1180         }
1181
1182         /* pull all remote databases onto the recdb */
1183         ret = pull_remote_database(ctdb, nodemap, recdb, dbid);
1184         if (ret != 0) {
1185                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1186                 return -1;
1187         }
1188
1189         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1190
1191         /* wipe all the remote databases. This is safe as we are in a transaction */
1192         w.db_id = dbid;
1193         w.transaction_id = transaction_id;
1194
1195         data.dptr = (void *)&w;
1196         data.dsize = sizeof(w);
1197
1198         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1199                         list_of_active_nodes(ctdb, nodemap, recdb, true),
1200                         CONTROL_TIMEOUT(), false, data) != 0) {
1201                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1202                 talloc_free(recdb);
1203                 return -1;
1204         }
1205         
1206         /* push out the correct database. This sets the dmaster and skips 
1207            the empty records */
1208         ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
1209         if (ret != 0) {
1210                 talloc_free(recdb);
1211                 return -1;
1212         }
1213
1214         /* all done with this database */
1215         talloc_free(recdb);
1216
1217         return 0;
1218 }
1219
1220                 
1221 /*
1222   we are the recmaster, and recovery is needed - start a recovery run
1223  */
1224 static int do_recovery(struct ctdb_recoverd *rec, 
1225                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1226                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
1227                        uint32_t culprit)
1228 {
1229         struct ctdb_context *ctdb = rec->ctdb;
1230         int i, j, ret;
1231         uint32_t generation;
1232         struct ctdb_dbid_map *dbmap;
1233         TDB_DATA data;
1234
1235         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1236
1237         /* if recovery fails, force it again */
1238         rec->need_recovery = true;
1239
1240         ctdb_set_culprit(rec, culprit);
1241
1242         if (rec->culprit_counter > 2*nodemap->num) {
1243                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
1244                          culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
1245                          ctdb->tunable.recovery_ban_period));
1246                 ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
1247         }
1248
1249         if (!ctdb_recovery_lock(ctdb, true)) {
1250                 ctdb_set_culprit(rec, pnn);
1251                 DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1252                 return -1;
1253         }
1254
1255         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
1256
1257         /* get a list of all databases */
1258         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1259         if (ret != 0) {
1260                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1261                 return -1;
1262         }
1263
1264         /* we do the db creation before we set the recovery mode, so the freeze happens
1265            on all databases we will be dealing with. */
1266
1267         /* verify that we have all the databases any other node has */
1268         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1269         if (ret != 0) {
1270                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1271                 return -1;
1272         }
1273
1274         /* verify that all other nodes have all our databases */
1275         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1276         if (ret != 0) {
1277                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1278                 return -1;
1279         }
1280
1281         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1282
1283
1284         /* set recovery mode to active on all nodes */
1285         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1286         if (ret!=0) {
1287                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1288                 return -1;
1289         }
1290
1291         /* execute the "startrecovery" event script on all nodes */
1292         ret = run_startrecovery_eventscript(ctdb, nodemap);
1293         if (ret!=0) {
1294                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1295                 return -1;
1296         }
1297
1298         /* pick a new generation number */
1299         generation = new_generation();
1300
1301         /* change the vnnmap on this node to use the new generation 
1302            number but not on any other nodes.
1303            this guarantees that if we abort the recovery prematurely
1304            for some reason (a node stops responding?)
1305            that we can just return immediately and we will reenter
1306            recovery shortly again.
1307            I.e. we deliberately leave the cluster with an inconsistent
1308            generation id to allow us to abort recovery at any stage and
1309            just restart it from scratch.
1310          */
1311         vnnmap->generation = generation;
1312         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1313         if (ret != 0) {
1314                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1315                 return -1;
1316         }
1317
1318         data.dptr = (void *)&generation;
1319         data.dsize = sizeof(uint32_t);
1320
1321         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1322                         list_of_active_nodes(ctdb, nodemap, mem_ctx, true),
1323                         CONTROL_TIMEOUT(), false, data) != 0) {
1324                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1325                 return -1;
1326         }
1327
1328         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1329
1330         for (i=0;i<dbmap->num;i++) {
1331                 if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
1332                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1333                         return -1;
1334                 }
1335         }
1336
1337         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1338
1339         /* commit all the changes */
1340         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1341                         list_of_active_nodes(ctdb, nodemap, mem_ctx, true),
1342                         CONTROL_TIMEOUT(), false, data) != 0) {
1343                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1344                 return -1;
1345         }
1346
1347         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1348         
1349
1350         /* build a new vnn map with all the currently active and
1351            unbanned nodes */
1352         generation = new_generation();
1353         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1354         CTDB_NO_MEMORY(ctdb, vnnmap);
1355         vnnmap->generation = generation;
1356         vnnmap->size = rec->num_active;
1357         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1358         for (i=j=0;i<nodemap->num;i++) {
1359                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1360                         vnnmap->map[j++] = nodemap->nodes[i].pnn;
1361                 }
1362         }
1363
1364         /* update to the new vnnmap on all nodes */
1365         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1366         if (ret != 0) {
1367                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1368                 return -1;
1369         }
1370
1371         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1372
1373         /* update recmaster to point to us for all nodes */
1374         ret = set_recovery_master(ctdb, nodemap, pnn);
1375         if (ret!=0) {
1376                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1377                 return -1;
1378         }
1379
1380         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1381
1382         /*
1383           update all nodes to have the same flags that we have
1384          */
1385         ret = update_flags_on_all_nodes(ctdb, nodemap);
1386         if (ret != 0) {
1387                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes\n"));
1388                 return -1;
1389         }
1390         
1391         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1392
1393         /*
1394           if enabled, tell nodes to takeover their public IPs
1395          */
1396         if (ctdb->vnn) {
1397                 rec->need_takeover_run = false;
1398                 ret = ctdb_takeover_run(ctdb, nodemap);
1399                 if (ret != 0) {
1400                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1401                         return -1;
1402                 }
1403                 DEBUG(DEBUG_INFO, (__location__ " Recovery - done takeover\n"));
1404         }
1405
1406         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1407
1408         /* execute the "recovered" event script on all nodes */
1409         ret = run_recovered_eventscript(ctdb, nodemap);
1410         if (ret!=0) {
1411                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster\n"));
1412                 return -1;
1413         }
1414
1415         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1416
1417         /* disable recovery mode */
1418         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
1419         if (ret!=0) {
1420                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1421                 return -1;
1422         }
1423
1424         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1425
1426         /* send a message to all clients telling them that the cluster 
1427            has been reconfigured */
1428         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1429
1430         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1431
1432         rec->need_recovery = false;
1433
1434         /* We just finished a recovery successfully. 
1435            We now wait for rerecovery_timeout before we allow 
1436            another recovery to take place.
1437         */
1438         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1439         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1440         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1441
1442         return 0;
1443 }
1444
1445
1446 /*
1447   elections are won by first checking the number of connected nodes, then
1448   the priority time, then the pnn
1449  */
1450 struct election_message {
1451         uint32_t num_connected;
1452         struct timeval priority_time;
1453         uint32_t pnn;
1454         uint32_t node_flags;
1455 };
1456
1457 /*
1458   form this nodes election data
1459  */
1460 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1461 {
1462         int ret, i;
1463         struct ctdb_node_map *nodemap;
1464         struct ctdb_context *ctdb = rec->ctdb;
1465
1466         ZERO_STRUCTP(em);
1467
1468         em->pnn = rec->ctdb->pnn;
1469         em->priority_time = rec->priority_time;
1470         em->node_flags = rec->node_flags;
1471
1472         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1473         if (ret != 0) {
1474                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1475                 return;
1476         }
1477
1478         for (i=0;i<nodemap->num;i++) {
1479                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1480                         em->num_connected++;
1481                 }
1482         }
1483         talloc_free(nodemap);
1484 }
1485
1486 /*
1487   see if the given election data wins
1488  */
1489 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1490 {
1491         struct election_message myem;
1492         int cmp = 0;
1493
1494         ctdb_election_data(rec, &myem);
1495
1496         /* we cant win if we are banned */
1497         if (rec->node_flags & NODE_FLAGS_BANNED) {
1498                 return false;
1499         }       
1500
1501         /* we will automatically win if the other node is banned */
1502         if (em->node_flags & NODE_FLAGS_BANNED) {
1503                 return true;
1504         }
1505
1506         /* try to use the most connected node */
1507         if (cmp == 0) {
1508                 cmp = (int)myem.num_connected - (int)em->num_connected;
1509         }
1510
1511         /* then the longest running node */
1512         if (cmp == 0) {
1513                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1514         }
1515
1516         if (cmp == 0) {
1517                 cmp = (int)myem.pnn - (int)em->pnn;
1518         }
1519
1520         return cmp > 0;
1521 }
1522
1523 /*
1524   send out an election request
1525  */
1526 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
1527 {
1528         int ret;
1529         TDB_DATA election_data;
1530         struct election_message emsg;
1531         uint64_t srvid;
1532         struct ctdb_context *ctdb = rec->ctdb;
1533
1534         srvid = CTDB_SRVID_RECOVERY;
1535
1536         ctdb_election_data(rec, &emsg);
1537
1538         election_data.dsize = sizeof(struct election_message);
1539         election_data.dptr  = (unsigned char *)&emsg;
1540
1541
1542         /* first we assume we will win the election and set 
1543            recoverymaster to be ourself on the current node
1544          */
1545         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1546         if (ret != 0) {
1547                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1548                 return -1;
1549         }
1550
1551
1552         /* send an election message to all active nodes */
1553         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1554
1555         return 0;
1556 }
1557
1558 /*
1559   this function will unban all nodes in the cluster
1560 */
1561 static void unban_all_nodes(struct ctdb_context *ctdb)
1562 {
1563         int ret, i;
1564         struct ctdb_node_map *nodemap;
1565         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1566         
1567         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1568         if (ret != 0) {
1569                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1570                 return;
1571         }
1572
1573         for (i=0;i<nodemap->num;i++) {
1574                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1575                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1576                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1577                 }
1578         }
1579
1580         talloc_free(tmp_ctx);
1581 }
1582
1583
1584 /*
1585   we think we are winning the election - send a broadcast election request
1586  */
1587 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1588 {
1589         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1590         int ret;
1591
1592         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
1593         if (ret != 0) {
1594                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1595         }
1596
1597         talloc_free(rec->send_election_te);
1598         rec->send_election_te = NULL;
1599 }
1600
1601 /*
1602   handler for recovery master elections
1603 */
1604 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1605                              TDB_DATA data, void *private_data)
1606 {
1607         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1608         int ret;
1609         struct election_message *em = (struct election_message *)data.dptr;
1610         TALLOC_CTX *mem_ctx;
1611
1612         /* we got an election packet - update the timeout for the election */
1613         talloc_free(rec->election_timeout);
1614         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1615                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1616                                                 ctdb_election_timeout, rec);
1617
1618         mem_ctx = talloc_new(ctdb);
1619
1620         /* someone called an election. check their election data
1621            and if we disagree and we would rather be the elected node, 
1622            send a new election message to all other nodes
1623          */
1624         if (ctdb_election_win(rec, em)) {
1625                 if (!rec->send_election_te) {
1626                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1627                                                                 timeval_current_ofs(0, 500000),
1628                                                                 election_send_request, rec);
1629                 }
1630                 talloc_free(mem_ctx);
1631                 /*unban_all_nodes(ctdb);*/
1632                 return;
1633         }
1634         
1635         /* we didn't win */
1636         talloc_free(rec->send_election_te);
1637         rec->send_election_te = NULL;
1638
1639         /* release the recmaster lock */
1640         if (em->pnn != ctdb->pnn &&
1641             ctdb->recovery_lock_fd != -1) {
1642                 close(ctdb->recovery_lock_fd);
1643                 ctdb->recovery_lock_fd = -1;
1644                 unban_all_nodes(ctdb);
1645         }
1646
1647         /* ok, let that guy become recmaster then */
1648         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1649         if (ret != 0) {
1650                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1651                 talloc_free(mem_ctx);
1652                 return;
1653         }
1654
1655         /* release any bans */
1656         rec->last_culprit = (uint32_t)-1;
1657         talloc_free(rec->banned_nodes);
1658         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1659         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1660
1661         talloc_free(mem_ctx);
1662         return;
1663 }
1664
1665
1666 /*
1667   force the start of the election process
1668  */
1669 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1670                            struct ctdb_node_map *nodemap)
1671 {
1672         int ret;
1673         struct ctdb_context *ctdb = rec->ctdb;
1674
1675         /* set all nodes to recovery mode to stop all internode traffic */
1676         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1677         if (ret!=0) {
1678                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1679                 return;
1680         }
1681
1682         talloc_free(rec->election_timeout);
1683         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1684                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1685                                                 ctdb_election_timeout, rec);
1686
1687         ret = send_election_request(rec, pnn);
1688         if (ret!=0) {
1689                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1690                 return;
1691         }
1692
1693         /* wait for a few seconds to collect all responses */
1694         ctdb_wait_election(rec);
1695 }
1696
1697
1698
1699 /*
1700   handler for when a node changes its flags
1701 */
1702 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1703                             TDB_DATA data, void *private_data)
1704 {
1705         int ret;
1706         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1707         struct ctdb_node_map *nodemap=NULL;
1708         TALLOC_CTX *tmp_ctx;
1709         uint32_t changed_flags;
1710         int i;
1711         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1712
1713         if (data.dsize != sizeof(*c)) {
1714                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1715                 return;
1716         }
1717
1718         tmp_ctx = talloc_new(ctdb);
1719         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1720
1721         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1722         if (ret != 0) {
1723                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1724                 talloc_free(tmp_ctx);
1725                 return;         
1726         }
1727
1728
1729         for (i=0;i<nodemap->num;i++) {
1730                 if (nodemap->nodes[i].pnn == c->pnn) break;
1731         }
1732
1733         if (i == nodemap->num) {
1734                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1735                 talloc_free(tmp_ctx);
1736                 return;
1737         }
1738
1739         changed_flags = c->old_flags ^ c->new_flags;
1740
1741         /* Dont let messages from remote nodes change the DISCONNECTED flag. 
1742            This flag is handled locally based on whether the local node
1743            can communicate with the node or not.
1744         */
1745         c->new_flags &= ~NODE_FLAGS_DISCONNECTED;
1746         if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) {
1747                 c->new_flags |= NODE_FLAGS_DISCONNECTED;
1748         }
1749
1750         if (nodemap->nodes[i].flags != c->new_flags) {
1751                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1752         }
1753
1754         nodemap->nodes[i].flags = c->new_flags;
1755
1756         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1757                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1758
1759         if (ret == 0) {
1760                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1761                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1762         }
1763         
1764         if (ret == 0 &&
1765             ctdb->recovery_master == ctdb->pnn &&
1766             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
1767             ctdb->vnn) {
1768                 /* Only do the takeover run if the perm disabled or unhealthy
1769                    flags changed since these will cause an ip failover but not
1770                    a recovery.
1771                    If the node became disconnected or banned this will also
1772                    lead to an ip address failover but that is handled 
1773                    during recovery
1774                 */
1775                 if (changed_flags & NODE_FLAGS_DISABLED) {
1776                         rec->need_takeover_run = true;
1777                 }
1778         }
1779
1780         talloc_free(tmp_ctx);
1781 }
1782
1783
1784
1785 struct verify_recmode_normal_data {
1786         uint32_t count;
1787         enum monitor_result status;
1788 };
1789
1790 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1791 {
1792         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
1793
1794
1795         /* one more node has responded with recmode data*/
1796         rmdata->count--;
1797
1798         /* if we failed to get the recmode, then return an error and let
1799            the main loop try again.
1800         */
1801         if (state->state != CTDB_CONTROL_DONE) {
1802                 if (rmdata->status == MONITOR_OK) {
1803                         rmdata->status = MONITOR_FAILED;
1804                 }
1805                 return;
1806         }
1807
1808         /* if we got a response, then the recmode will be stored in the
1809            status field
1810         */
1811         if (state->status != CTDB_RECOVERY_NORMAL) {
1812                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
1813                 rmdata->status = MONITOR_RECOVERY_NEEDED;
1814         }
1815
1816         return;
1817 }
1818
1819
1820 /* verify that all nodes are in normal recovery mode */
1821 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
1822 {
1823         struct verify_recmode_normal_data *rmdata;
1824         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1825         struct ctdb_client_control_state *state;
1826         enum monitor_result status;
1827         int j;
1828         
1829         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
1830         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1831         rmdata->count  = 0;
1832         rmdata->status = MONITOR_OK;
1833
1834         /* loop over all active nodes and send an async getrecmode call to 
1835            them*/
1836         for (j=0; j<nodemap->num; j++) {
1837                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1838                         continue;
1839                 }
1840                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
1841                                         CONTROL_TIMEOUT(), 
1842                                         nodemap->nodes[j].pnn);
1843                 if (state == NULL) {
1844                         /* we failed to send the control, treat this as 
1845                            an error and try again next iteration
1846                         */                      
1847                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
1848                         talloc_free(mem_ctx);
1849                         return MONITOR_FAILED;
1850                 }
1851
1852                 /* set up the callback functions */
1853                 state->async.fn = verify_recmode_normal_callback;
1854                 state->async.private_data = rmdata;
1855
1856                 /* one more control to wait for to complete */
1857                 rmdata->count++;
1858         }
1859
1860
1861         /* now wait for up to the maximum number of seconds allowed
1862            or until all nodes we expect a response from has replied
1863         */
1864         while (rmdata->count > 0) {
1865                 event_loop_once(ctdb->ev);
1866         }
1867
1868         status = rmdata->status;
1869         talloc_free(mem_ctx);
1870         return status;
1871 }
1872
1873
1874 struct verify_recmaster_data {
1875         uint32_t count;
1876         uint32_t pnn;
1877         enum monitor_result status;
1878 };
1879
1880 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
1881 {
1882         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
1883
1884
1885         /* one more node has responded with recmaster data*/
1886         rmdata->count--;
1887
1888         /* if we failed to get the recmaster, then return an error and let
1889            the main loop try again.
1890         */
1891         if (state->state != CTDB_CONTROL_DONE) {
1892                 if (rmdata->status == MONITOR_OK) {
1893                         rmdata->status = MONITOR_FAILED;
1894                 }
1895                 return;
1896         }
1897
1898         /* if we got a response, then the recmaster will be stored in the
1899            status field
1900         */
1901         if (state->status != rmdata->pnn) {
1902                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
1903                 rmdata->status = MONITOR_ELECTION_NEEDED;
1904         }
1905
1906         return;
1907 }
1908
1909
1910 /* verify that all nodes agree that we are the recmaster */
1911 static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
1912 {
1913         struct verify_recmaster_data *rmdata;
1914         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1915         struct ctdb_client_control_state *state;
1916         enum monitor_result status;
1917         int j;
1918         
1919         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
1920         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1921         rmdata->count  = 0;
1922         rmdata->pnn    = pnn;
1923         rmdata->status = MONITOR_OK;
1924
1925         /* loop over all active nodes and send an async getrecmaster call to 
1926            them*/
1927         for (j=0; j<nodemap->num; j++) {
1928                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1929                         continue;
1930                 }
1931                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
1932                                         CONTROL_TIMEOUT(),
1933                                         nodemap->nodes[j].pnn);
1934                 if (state == NULL) {
1935                         /* we failed to send the control, treat this as 
1936                            an error and try again next iteration
1937                         */                      
1938                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
1939                         talloc_free(mem_ctx);
1940                         return MONITOR_FAILED;
1941                 }
1942
1943                 /* set up the callback functions */
1944                 state->async.fn = verify_recmaster_callback;
1945                 state->async.private_data = rmdata;
1946
1947                 /* one more control to wait for to complete */
1948                 rmdata->count++;
1949         }
1950
1951
1952         /* now wait for up to the maximum number of seconds allowed
1953            or until all nodes we expect a response from has replied
1954         */
1955         while (rmdata->count > 0) {
1956                 event_loop_once(ctdb->ev);
1957         }
1958
1959         status = rmdata->status;
1960         talloc_free(mem_ctx);
1961         return status;
1962 }
1963
1964 /*
1965   this function writes the number of connected nodes we have for this pnn
1966   to the pnn slot in the reclock file
1967 */
1968 static void
1969 ctdb_recoverd_write_pnn_connect_count(struct ctdb_recoverd *rec)
1970 {
1971         const char count = rec->num_active;
1972         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
1973
1974         if (pwrite(rec->rec_file_fd, &count, 1, ctdb->pnn) == -1) {
1975                 DEBUG(DEBUG_CRIT, (__location__ " Failed to write pnn count\n"));
1976         }
1977 }
1978
1979 /* 
1980   this function opens the reclock file and sets a byterage lock for the single
1981   byte at position pnn+1.
1982   the existence/non-existence of such a lock provides an alternative mechanism
1983   to know whether a remote node(recovery daemon) is running or not.
1984 */
1985 static void
1986 ctdb_recoverd_get_pnn_lock(struct ctdb_recoverd *rec)
1987 {
1988         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
1989         struct flock lock;
1990         char *pnnfile = NULL;
1991
1992         DEBUG(DEBUG_INFO, ("Setting PNN lock for pnn:%d\n", ctdb->pnn));
1993
1994         if (rec->rec_file_fd != -1) {
1995                 DEBUG(DEBUG_CRIT, (__location__ " rec_lock_fd is already open. Aborting\n"));
1996                 exit(10);
1997         }
1998
1999         pnnfile = talloc_asprintf(rec, "%s.pnn", ctdb->recovery_lock_file);
2000         CTDB_NO_MEMORY_FATAL(ctdb, pnnfile);
2001
2002         rec->rec_file_fd = open(pnnfile, O_RDWR|O_CREAT, 0600);
2003         if (rec->rec_file_fd == -1) {
2004                 DEBUG(DEBUG_CRIT,(__location__ " Unable to open %s - (%s)\n", 
2005                          pnnfile, strerror(errno)));
2006                 exit(10);
2007         }
2008
2009         set_close_on_exec(rec->rec_file_fd);
2010         lock.l_type = F_WRLCK;
2011         lock.l_whence = SEEK_SET;
2012         lock.l_start = ctdb->pnn;
2013         lock.l_len = 1;
2014         lock.l_pid = 0;
2015
2016         if (fcntl(rec->rec_file_fd, F_SETLK, &lock) != 0) {
2017                 close(rec->rec_file_fd);
2018                 rec->rec_file_fd = -1;
2019                 DEBUG(DEBUG_CRIT,(__location__ " Failed to get pnn lock on '%s'\n", pnnfile));
2020                 exit(10);
2021         }
2022
2023
2024         DEBUG(DEBUG_NOTICE,(__location__ " Got pnn lock on '%s'\n", pnnfile));
2025
2026         talloc_free(pnnfile);
2027
2028         /* we start out with 0 connected nodes */
2029         ctdb_recoverd_write_pnn_connect_count(rec);
2030 }
2031
2032 /*
2033   called when we need to do the periodical reclock pnn count update
2034  */
2035 static void ctdb_update_pnn_count(struct event_context *ev, struct timed_event *te, 
2036                                   struct timeval t, void *p)
2037 {
2038         int i, count;
2039         struct ctdb_recoverd *rec     = talloc_get_type(p, struct ctdb_recoverd);
2040         struct ctdb_context *ctdb     = rec->ctdb;
2041         struct ctdb_node_map *nodemap = rec->nodemap;
2042
2043         ctdb_recoverd_write_pnn_connect_count(rec);
2044
2045         event_add_timed(rec->ctdb->ev, rec->ctdb,
2046                 timeval_current_ofs(ctdb->tunable.reclock_ping_period, 0), 
2047                 ctdb_update_pnn_count, rec);
2048
2049         /* check if there is a split cluster and yeld the recmaster role
2050            it the other half of the cluster is larger 
2051         */
2052         DEBUG(DEBUG_DEBUG, ("CHECK FOR SPLIT CLUSTER\n"));
2053         if (rec->nodemap == NULL) {
2054                 return;
2055         }
2056         if (rec->rec_file_fd == -1) {
2057                 return;
2058         }
2059         /* only test this if we think we are the recmaster */
2060         if (ctdb->pnn != rec->recmaster) {
2061                 DEBUG(DEBUG_DEBUG, ("We are not recmaster, skip test\n"));
2062                 return;
2063         }
2064         if (ctdb->recovery_lock_fd == -1) {
2065                 return;
2066         }
2067         for (i=0; i<nodemap->num; i++) {
2068                 /* we dont need to check ourself */
2069                 if (nodemap->nodes[i].pnn == ctdb->pnn) {
2070                         continue;
2071                 }
2072                 /* dont check nodes that are connected to us */
2073                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2074                         continue;
2075                 }
2076                 /* check if the node is "connected" and how connected it it */
2077                 count = ctdb_read_pnn_lock(rec->rec_file_fd, nodemap->nodes[i].pnn);
2078                 if (count < 0) {
2079                         continue;
2080                 }
2081                 /* check if that node is more connected that us */
2082                 if (count > rec->num_active) {
2083                         DEBUG(DEBUG_ERR, ("DISCONNECTED Node %u is more connected than we are, yielding recmaster role\n", nodemap->nodes[i].pnn));
2084                         close(ctdb->recovery_lock_fd);
2085                         ctdb->recovery_lock_fd = -1;
2086                         force_election(rec, ctdb->pnn, rec->nodemap);
2087                         return;
2088                 }
2089         }
2090 }
2091
2092 /*
2093   the main monitoring loop
2094  */
2095 static void monitor_cluster(struct ctdb_context *ctdb)
2096 {
2097         uint32_t pnn;
2098         TALLOC_CTX *mem_ctx=NULL;
2099         struct ctdb_node_map *nodemap=NULL;
2100         struct ctdb_node_map *remote_nodemap=NULL;
2101         struct ctdb_vnn_map *vnnmap=NULL;
2102         struct ctdb_vnn_map *remote_vnnmap=NULL;
2103         int32_t debug_level;
2104         int i, j, ret;
2105         struct ctdb_recoverd *rec;
2106         struct ctdb_all_public_ips *ips;
2107         char c;
2108
2109         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2110
2111         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2112         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2113
2114         rec->ctdb = ctdb;
2115         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
2116         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
2117
2118         rec->priority_time = timeval_current();
2119
2120         /* open the rec file fd and lock our slot */
2121         rec->rec_file_fd = -1;
2122         ctdb_recoverd_get_pnn_lock(rec);
2123
2124         /* register a message port for recovery elections */
2125         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2126
2127         /* and one for when nodes are disabled/enabled */
2128         ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
2129
2130         /* and one for when nodes are banned */
2131         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
2132
2133         /* and one for when nodes are unbanned */
2134         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
2135
2136         /* register a message port for vacuum fetch */
2137         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2138
2139         /* update the reclock pnn file connected count on a regular basis */
2140         event_add_timed(ctdb->ev, ctdb,
2141                 timeval_current_ofs(ctdb->tunable.reclock_ping_period, 0), 
2142                 ctdb_update_pnn_count, rec);
2143
2144 again:
2145         if (mem_ctx) {
2146                 talloc_free(mem_ctx);
2147                 mem_ctx = NULL;
2148         }
2149         mem_ctx = talloc_new(ctdb);
2150         if (!mem_ctx) {
2151                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2152                 exit(-1);
2153         }
2154
2155         /* we only check for recovery once every second */
2156         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2157
2158         /* verify that the main daemon is still running */
2159         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2160                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2161                 exit(-1);
2162         }
2163
2164         if (rec->election_timeout) {
2165                 /* an election is in progress */
2166                 goto again;
2167         }
2168
2169         /* read the debug level from the parent and update locally */
2170         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2171         if (ret !=0) {
2172                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2173                 goto again;
2174         }
2175         LogLevel = debug_level;
2176
2177
2178         /* We must check if we need to ban a node here but we want to do this
2179            as early as possible so we dont wait until we have pulled the node
2180            map from the local node. thats why we have the hardcoded value 20
2181         */
2182         if (rec->culprit_counter > 20) {
2183                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u failures in %.0f seconds - banning it for %u seconds\n",
2184                          rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
2185                          ctdb->tunable.recovery_ban_period));
2186                 ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
2187         }
2188
2189         /* get relevant tunables */
2190         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2191         if (ret != 0) {
2192                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2193                 goto again;
2194         }
2195
2196         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2197         if (pnn == (uint32_t)-1) {
2198                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2199                 goto again;
2200         }
2201
2202         /* get the vnnmap */
2203         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2204         if (ret != 0) {
2205                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2206                 goto again;
2207         }
2208
2209
2210         /* get number of nodes */
2211         if (rec->nodemap) {
2212                 talloc_free(rec->nodemap);
2213                 rec->nodemap = NULL;
2214                 nodemap=NULL;
2215         }
2216         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2217         if (ret != 0) {
2218                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2219                 goto again;
2220         }
2221         nodemap = rec->nodemap;
2222
2223         /* check which node is the recovery master */
2224         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2225         if (ret != 0) {
2226                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2227                 goto again;
2228         }
2229
2230         if (rec->recmaster == (uint32_t)-1) {
2231                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2232                 force_election(rec, pnn, nodemap);
2233                 goto again;
2234         }
2235         
2236         /* check that we (recovery daemon) and the local ctdb daemon
2237            agrees on whether we are banned or not
2238         */
2239         if (nodemap->nodes[pnn].flags & NODE_FLAGS_BANNED) {
2240                 if (rec->banned_nodes[pnn] == NULL) {
2241                         if (rec->recmaster == pnn) {
2242                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster thinks this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2243
2244                                 ctdb_unban_node(rec, pnn);
2245                         } else {
2246                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster thinks this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2247                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2248                                 ctdb_set_culprit(rec, pnn);
2249                         }
2250                         goto again;
2251                 }
2252         } else {
2253                 if (rec->banned_nodes[pnn] != NULL) {
2254                         if (rec->recmaster == pnn) {
2255                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster does not think this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2256
2257                                 ctdb_unban_node(rec, pnn);
2258                         } else {
2259                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster does not think this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2260
2261                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2262                                 ctdb_set_culprit(rec, pnn);
2263                         }
2264                         goto again;
2265                 }
2266         }
2267
2268         /* remember our own node flags */
2269         rec->node_flags = nodemap->nodes[pnn].flags;
2270
2271         /* count how many active nodes there are */
2272         rec->num_active = 0;
2273         for (i=0; i<nodemap->num; i++) {
2274                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2275                         rec->num_active++;
2276                 }
2277         }
2278
2279
2280         /* verify that the recmaster node is still active */
2281         for (j=0; j<nodemap->num; j++) {
2282                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2283                         break;
2284                 }
2285         }
2286
2287         if (j == nodemap->num) {
2288                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2289                 force_election(rec, pnn, nodemap);
2290                 goto again;
2291         }
2292
2293         /* if recovery master is disconnected we must elect a new recmaster */
2294         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2295                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2296                 force_election(rec, pnn, nodemap);
2297                 goto again;
2298         }
2299
2300         /* grap the nodemap from the recovery master to check if it is banned */
2301         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2302                                    mem_ctx, &remote_nodemap);
2303         if (ret != 0) {
2304                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2305                           nodemap->nodes[j].pnn));
2306                 goto again;
2307         }
2308
2309
2310         if (remote_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2311                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2312                 force_election(rec, pnn, nodemap);
2313                 goto again;
2314         }
2315
2316         /* verify that the public ip address allocation is consistent */
2317         if (ctdb->vnn != NULL) {
2318                 ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2319                 if (ret != 0) {
2320                         DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", i));
2321                         goto again;
2322                 }
2323                 for (j=0; j<ips->num; j++) {
2324                         /* verify that we have the ip addresses we should have
2325                            and we dont have ones we shouldnt have.
2326                            if we find an inconsistency we set recmode to
2327                            active on the local node and wait for the recmaster
2328                            to do a full blown recovery
2329                         */
2330                         if (ips->ips[j].pnn == pnn) {
2331                                 if (!ctdb_sys_have_ip(ips->ips[j].sin)) {
2332                                         DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
2333                                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2334                                         if (ret != 0) {
2335                                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2336                                                 goto again;
2337                                         }
2338                                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2339                                         if (ret != 0) {
2340                                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2341                                                 goto again;
2342                                         }
2343                                 }
2344                         } else {
2345                                 if (ctdb_sys_have_ip(ips->ips[j].sin)) {
2346                                         DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
2347                                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2348                                         if (ret != 0) {
2349                                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2350                                                 goto again;
2351                                         }
2352                                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2353                                         if (ret != 0) {
2354                                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2355                                                 goto again;
2356                                         }
2357                                 }
2358                         }
2359                 }
2360         }
2361
2362         /* if we are not the recmaster then we do not need to check
2363            if recovery is needed
2364          */
2365         if (pnn != rec->recmaster) {
2366                 goto again;
2367         }
2368
2369
2370         /* ensure our local copies of flags are right */
2371         ret = update_local_flags(rec, nodemap);
2372         if (ret == MONITOR_ELECTION_NEEDED) {
2373                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
2374                 force_election(rec, pnn, nodemap);
2375                 goto again;
2376         }
2377         if (ret != MONITOR_OK) {
2378                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2379                 goto again;
2380         }
2381
2382         /* update the list of public ips that a node can handle for
2383            all connected nodes
2384         */
2385         for (j=0; j<nodemap->num; j++) {
2386                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2387                         continue;
2388                 }
2389                 /* release any existing data */
2390                 if (ctdb->nodes[j]->public_ips) {
2391                         talloc_free(ctdb->nodes[j]->public_ips);
2392                         ctdb->nodes[j]->public_ips = NULL;
2393                 }
2394                 /* grab a new shiny list of public ips from the node */
2395                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
2396                         ctdb->nodes[j]->pnn, 
2397                         ctdb->nodes,
2398                         &ctdb->nodes[j]->public_ips)) {
2399                         DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
2400                                 ctdb->nodes[j]->pnn));
2401                         goto again;
2402                 }
2403         }
2404
2405
2406         /* verify that all active nodes agree that we are the recmaster */
2407         switch (verify_recmaster(ctdb, nodemap, pnn)) {
2408         case MONITOR_RECOVERY_NEEDED:
2409                 /* can not happen */
2410                 goto again;
2411         case MONITOR_ELECTION_NEEDED:
2412                 force_election(rec, pnn, nodemap);
2413                 goto again;
2414         case MONITOR_OK:
2415                 break;
2416         case MONITOR_FAILED:
2417                 goto again;
2418         }
2419
2420
2421         if (rec->need_recovery) {
2422                 /* a previous recovery didn't finish */
2423                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2424                 goto again;             
2425         }
2426
2427         /* verify that all active nodes are in normal mode 
2428            and not in recovery mode 
2429          */
2430         switch (verify_recmode(ctdb, nodemap)) {
2431         case MONITOR_RECOVERY_NEEDED:
2432                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2433                 goto again;
2434         case MONITOR_FAILED:
2435                 goto again;
2436         case MONITOR_ELECTION_NEEDED:
2437                 /* can not happen */
2438         case MONITOR_OK:
2439                 break;
2440         }
2441
2442
2443         /* we should have the reclock - check its not stale */
2444         if (ctdb->recovery_lock_fd == -1) {
2445                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2446                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2447                 goto again;
2448         }
2449
2450         if (pread(ctdb->recovery_lock_fd, &c, 1, 0) == -1) {
2451                 DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2452                 close(ctdb->recovery_lock_fd);
2453                 ctdb->recovery_lock_fd = -1;
2454                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2455                 goto again;
2456         }
2457
2458         /* get the nodemap for all active remote nodes and verify
2459            they are the same as for this node
2460          */
2461         for (j=0; j<nodemap->num; j++) {
2462                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2463                         continue;
2464                 }
2465                 if (nodemap->nodes[j].pnn == pnn) {
2466                         continue;
2467                 }
2468
2469                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2470                                            mem_ctx, &remote_nodemap);
2471                 if (ret != 0) {
2472                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
2473                                   nodemap->nodes[j].pnn));
2474                         goto again;
2475                 }
2476
2477                 /* if the nodes disagree on how many nodes there are
2478                    then this is a good reason to try recovery
2479                  */
2480                 if (remote_nodemap->num != nodemap->num) {
2481                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2482                                   nodemap->nodes[j].pnn, remote_nodemap->num, nodemap->num));
2483                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2484                         goto again;
2485                 }
2486
2487                 /* if the nodes disagree on which nodes exist and are
2488                    active, then that is also a good reason to do recovery
2489                  */
2490                 for (i=0;i<nodemap->num;i++) {
2491                         if (remote_nodemap->nodes[i].pnn != nodemap->nodes[i].pnn) {
2492                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2493                                           nodemap->nodes[j].pnn, i, 
2494                                           remote_nodemap->nodes[i].pnn, nodemap->nodes[i].pnn));
2495                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2496                                             vnnmap, nodemap->nodes[j].pnn);
2497                                 goto again;
2498                         }
2499                         if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
2500                             (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2501                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
2502                                           nodemap->nodes[j].pnn, i,
2503                                           remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
2504                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2505                                             vnnmap, nodemap->nodes[j].pnn);
2506                                 goto again;
2507                         }
2508                 }
2509
2510         }
2511
2512
2513         /* there better be the same number of lmasters in the vnn map
2514            as there are active nodes or we will have to do a recovery
2515          */
2516         if (vnnmap->size != rec->num_active) {
2517                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
2518                           vnnmap->size, rec->num_active));
2519                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2520                 goto again;
2521         }
2522
2523         /* verify that all active nodes in the nodemap also exist in 
2524            the vnnmap.
2525          */
2526         for (j=0; j<nodemap->num; j++) {
2527                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2528                         continue;
2529                 }
2530                 if (nodemap->nodes[j].pnn == pnn) {
2531                         continue;
2532                 }
2533
2534                 for (i=0; i<vnnmap->size; i++) {
2535                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2536                                 break;
2537                         }
2538                 }
2539                 if (i == vnnmap->size) {
2540                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
2541                                   nodemap->nodes[j].pnn));
2542                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2543                         goto again;
2544                 }
2545         }
2546
2547         
2548         /* verify that all other nodes have the same vnnmap
2549            and are from the same generation
2550          */
2551         for (j=0; j<nodemap->num; j++) {
2552                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2553                         continue;
2554                 }
2555                 if (nodemap->nodes[j].pnn == pnn) {
2556                         continue;
2557                 }
2558
2559                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2560                                           mem_ctx, &remote_vnnmap);
2561                 if (ret != 0) {
2562                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
2563                                   nodemap->nodes[j].pnn));
2564                         goto again;
2565                 }
2566
2567                 /* verify the vnnmap generation is the same */
2568                 if (vnnmap->generation != remote_vnnmap->generation) {
2569                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
2570                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
2571                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2572                         goto again;
2573                 }
2574
2575                 /* verify the vnnmap size is the same */
2576                 if (vnnmap->size != remote_vnnmap->size) {
2577                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
2578                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
2579                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2580                         goto again;
2581                 }
2582
2583                 /* verify the vnnmap is the same */
2584                 for (i=0;i<vnnmap->size;i++) {
2585                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
2586                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
2587                                           nodemap->nodes[j].pnn));
2588                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2589                                             vnnmap, nodemap->nodes[j].pnn);
2590                                 goto again;
2591                         }
2592                 }
2593         }
2594
2595         /* we might need to change who has what IP assigned */
2596         if (rec->need_takeover_run) {
2597                 rec->need_takeover_run = false;
2598
2599                 /* execute the "startrecovery" event script on all nodes */
2600                 ret = run_startrecovery_eventscript(ctdb, nodemap);
2601                 if (ret!=0) {
2602                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
2603                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2604                                     vnnmap, ctdb->pnn);
2605                 }
2606
2607                 ret = ctdb_takeover_run(ctdb, nodemap);
2608                 if (ret != 0) {
2609                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
2610                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2611                                     vnnmap, ctdb->pnn);
2612                 }
2613
2614                 /* execute the "recovered" event script on all nodes */
2615                 ret = run_recovered_eventscript(ctdb, nodemap);
2616                 if (ret!=0) {
2617                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster\n"));
2618                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2619                                     vnnmap, ctdb->pnn);
2620                 }
2621         }
2622
2623         goto again;
2624
2625 }
2626
2627 /*
2628   event handler for when the main ctdbd dies
2629  */
2630 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
2631                                  uint16_t flags, void *private_data)
2632 {
2633         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
2634         _exit(1);
2635 }
2636
2637 /*
2638   startup the recovery daemon as a child of the main ctdb daemon
2639  */
2640 int ctdb_start_recoverd(struct ctdb_context *ctdb)
2641 {
2642         int ret;
2643         int fd[2];
2644
2645         if (pipe(fd) != 0) {
2646                 return -1;
2647         }
2648
2649         ctdb->ctdbd_pid = getpid();
2650
2651         ctdb->recoverd_pid = fork();
2652         if (ctdb->recoverd_pid == -1) {
2653                 return -1;
2654         }
2655         
2656         if (ctdb->recoverd_pid != 0) {
2657                 close(fd[0]);
2658                 return 0;
2659         }
2660
2661         close(fd[1]);
2662
2663         /* shutdown the transport */
2664         ctdb->methods->shutdown(ctdb);
2665
2666         /* get a new event context */
2667         talloc_free(ctdb->ev);
2668         ctdb->ev = event_context_init(ctdb);
2669
2670         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
2671                      ctdb_recoverd_parent, &fd[0]);     
2672
2673         close(ctdb->daemon.sd);
2674         ctdb->daemon.sd = -1;
2675
2676         srandom(getpid() ^ time(NULL));
2677
2678         /* the recovery daemon does not need to be realtime */
2679         if (ctdb->do_setsched) {
2680                 ctdb_restore_scheduler(ctdb);
2681         }
2682
2683         /* initialise ctdb */
2684         ret = ctdb_socket_connect(ctdb);
2685         if (ret != 0) {
2686                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb\n"));
2687                 exit(1);
2688         }
2689
2690         monitor_cluster(ctdb);
2691
2692         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
2693         return -1;
2694 }
2695
2696 /*
2697   shutdown the recovery daemon
2698  */
2699 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
2700 {
2701         if (ctdb->recoverd_pid == 0) {
2702                 return;
2703         }
2704
2705         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
2706         kill(ctdb->recoverd_pid, SIGTERM);
2707 }