added debug constants to allow for better mapping to syslog levels
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 struct ban_state {
35         struct ctdb_recoverd *rec;
36         uint32_t banned_node;
37 };
38
39 /*
40   private state of recovery daemon
41  */
42 struct ctdb_recoverd {
43         struct ctdb_context *ctdb;
44         uint32_t last_culprit;
45         uint32_t culprit_counter;
46         struct timeval first_recover_time;
47         struct ban_state **banned_nodes;
48         struct timeval priority_time;
49         bool need_takeover_run;
50         bool need_recovery;
51         uint32_t node_flags;
52         struct timed_event *send_election_te;
53         struct timed_event *election_timeout;
54         struct vacuum_info *vacuum_info;
55 };
56
57 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
58 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
59
60
61 /*
62   unban a node
63  */
64 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
65 {
66         struct ctdb_context *ctdb = rec->ctdb;
67
68         DEBUG(0,("Unbanning node %u\n", pnn));
69
70         if (!ctdb_validate_pnn(ctdb, pnn)) {
71                 DEBUG(0,("Bad pnn %u in ctdb_unban_node\n", pnn));
72                 return;
73         }
74
75         /* If we are unbanning a different node then just pass the ban info on */
76         if (pnn != ctdb->pnn) {
77                 TDB_DATA data;
78                 int ret;
79                 
80                 DEBUG(0,("Unanning remote node %u. Passing the ban request on to the remote node.\n", pnn));
81
82                 data.dptr = (uint8_t *)&pnn;
83                 data.dsize = sizeof(uint32_t);
84
85                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_UNBAN_NODE, data);
86                 if (ret != 0) {
87                         DEBUG(0,("Failed to unban node %u\n", pnn));
88                         return;
89                 }
90
91                 return;
92         }
93
94         /* make sure we remember we are no longer banned in case 
95            there is an election */
96         rec->node_flags &= ~NODE_FLAGS_BANNED;
97
98         DEBUG(0,("Clearing ban flag on node %u\n", pnn));
99         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
100
101         if (rec->banned_nodes[pnn] == NULL) {
102                 DEBUG(0,("No ban recorded for this node. ctdb_unban_node() request ignored\n"));
103                 return;
104         }
105
106         talloc_free(rec->banned_nodes[pnn]);
107         rec->banned_nodes[pnn] = NULL;
108 }
109
110
111 /*
112   called when a ban has timed out
113  */
114 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
115 {
116         struct ban_state *state = talloc_get_type(p, struct ban_state);
117         struct ctdb_recoverd *rec = state->rec;
118         uint32_t pnn = state->banned_node;
119
120         DEBUG(0,("Ban timeout. Node %u is now unbanned\n", pnn));
121         ctdb_unban_node(rec, pnn);
122 }
123
124 /*
125   ban a node for a period of time
126  */
127 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
128 {
129         struct ctdb_context *ctdb = rec->ctdb;
130
131         DEBUG(0,("Banning node %u for %u seconds\n", pnn, ban_time));
132
133         if (!ctdb_validate_pnn(ctdb, pnn)) {
134                 DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
135                 return;
136         }
137
138         if (0 == ctdb->tunable.enable_bans) {
139                 DEBUG(0,("Bans are disabled - ignoring ban of node %u\n", pnn));
140                 return;
141         }
142
143         /* If we are banning a different node then just pass the ban info on */
144         if (pnn != ctdb->pnn) {
145                 struct ctdb_ban_info b;
146                 TDB_DATA data;
147                 int ret;
148                 
149                 DEBUG(0,("Banning remote node %u for %u seconds. Passing the ban request on to the remote node.\n", pnn, ban_time));
150
151                 b.pnn = pnn;
152                 b.ban_time = ban_time;
153
154                 data.dptr = (uint8_t *)&b;
155                 data.dsize = sizeof(b);
156
157                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_BAN_NODE, data);
158                 if (ret != 0) {
159                         DEBUG(0,("Failed to ban node %u\n", pnn));
160                         return;
161                 }
162
163                 return;
164         }
165
166         DEBUG(0,("self ban - lowering our election priority\n"));
167         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
168
169         /* banning ourselves - lower our election priority */
170         rec->priority_time = timeval_current();
171
172         /* make sure we remember we are banned in case there is an 
173            election */
174         rec->node_flags |= NODE_FLAGS_BANNED;
175
176         if (rec->banned_nodes[pnn] != NULL) {
177                 DEBUG(0,("Re-banning an already banned node. Remove previous ban and set a new ban.\n"));               
178                 talloc_free(rec->banned_nodes[pnn]);
179                 rec->banned_nodes[pnn] = NULL;
180         }
181
182         rec->banned_nodes[pnn] = talloc(rec->banned_nodes, struct ban_state);
183         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
184
185         rec->banned_nodes[pnn]->rec = rec;
186         rec->banned_nodes[pnn]->banned_node = pnn;
187
188         if (ban_time != 0) {
189                 event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
190                                 timeval_current_ofs(ban_time, 0),
191                                 ctdb_ban_timeout, rec->banned_nodes[pnn]);
192         }
193 }
194
195 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
196
197
198 /*
199   run the "recovered" eventscript on all nodes
200  */
201 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
202 {
203         TALLOC_CTX *tmp_ctx;
204
205         tmp_ctx = talloc_new(ctdb);
206         CTDB_NO_MEMORY(ctdb, tmp_ctx);
207
208         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
209                         list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
210                         CONTROL_TIMEOUT(), false, tdb_null) != 0) {
211                 DEBUG(0, (__location__ " Unable to run the 'recovered' event. Recovery failed.\n"));
212                 talloc_free(tmp_ctx);
213                 return -1;
214         }
215
216         talloc_free(tmp_ctx);
217         return 0;
218 }
219
220 /*
221   run the "startrecovery" eventscript on all nodes
222  */
223 static int run_startrecovery_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
224 {
225         TALLOC_CTX *tmp_ctx;
226
227         tmp_ctx = talloc_new(ctdb);
228         CTDB_NO_MEMORY(ctdb, tmp_ctx);
229
230         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
231                         list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
232                         CONTROL_TIMEOUT(), false, tdb_null) != 0) {
233                 DEBUG(0, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
234                 talloc_free(tmp_ctx);
235                 return -1;
236         }
237
238         talloc_free(tmp_ctx);
239         return 0;
240 }
241
242 /*
243   change recovery mode on all nodes
244  */
245 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
246 {
247         TDB_DATA data;
248         uint32_t *nodes;
249         TALLOC_CTX *tmp_ctx;
250
251         tmp_ctx = talloc_new(ctdb);
252         CTDB_NO_MEMORY(ctdb, tmp_ctx);
253
254         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
255
256         /* freeze all nodes */
257         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
258                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
259                                                 nodes, CONTROL_TIMEOUT(),
260                                                 false, tdb_null) != 0) {
261                         DEBUG(0, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
262                         talloc_free(tmp_ctx);
263                         return -1;
264                 }
265         }
266
267
268         data.dsize = sizeof(uint32_t);
269         data.dptr = (unsigned char *)&rec_mode;
270
271         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
272                                         nodes, CONTROL_TIMEOUT(),
273                                         false, data) != 0) {
274                 DEBUG(0, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
275                 talloc_free(tmp_ctx);
276                 return -1;
277         }
278
279         if (rec_mode == CTDB_RECOVERY_NORMAL) {
280                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
281                                                 nodes, CONTROL_TIMEOUT(),
282                                                 false, tdb_null) != 0) {
283                         DEBUG(0, (__location__ " Unable to thaw nodes. Recovery failed.\n"));
284                         talloc_free(tmp_ctx);
285                         return -1;
286                 }
287         }
288
289         talloc_free(tmp_ctx);
290         return 0;
291 }
292
293 /*
294   change recovery master on all node
295  */
296 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
297 {
298         TDB_DATA data;
299         TALLOC_CTX *tmp_ctx;
300
301         tmp_ctx = talloc_new(ctdb);
302         CTDB_NO_MEMORY(ctdb, tmp_ctx);
303
304         data.dsize = sizeof(uint32_t);
305         data.dptr = (unsigned char *)&pnn;
306
307         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
308                         list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
309                         CONTROL_TIMEOUT(), false, data) != 0) {
310                 DEBUG(0, (__location__ " Unable to set recmaster. Recovery failed.\n"));
311                 talloc_free(tmp_ctx);
312                 return -1;
313         }
314
315         talloc_free(tmp_ctx);
316         return 0;
317 }
318
319
320 /*
321   ensure all other nodes have attached to any databases that we have
322  */
323 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
324                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
325 {
326         int i, j, db, ret;
327         struct ctdb_dbid_map *remote_dbmap;
328
329         /* verify that all other nodes have all our databases */
330         for (j=0; j<nodemap->num; j++) {
331                 /* we dont need to ourself ourselves */
332                 if (nodemap->nodes[j].pnn == pnn) {
333                         continue;
334                 }
335                 /* dont check nodes that are unavailable */
336                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
337                         continue;
338                 }
339
340                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
341                                          mem_ctx, &remote_dbmap);
342                 if (ret != 0) {
343                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
344                         return -1;
345                 }
346
347                 /* step through all local databases */
348                 for (db=0; db<dbmap->num;db++) {
349                         const char *name;
350
351
352                         for (i=0;i<remote_dbmap->num;i++) {
353                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
354                                         break;
355                                 }
356                         }
357                         /* the remote node already have this database */
358                         if (i!=remote_dbmap->num) {
359                                 continue;
360                         }
361                         /* ok so we need to create this database */
362                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
363                                             mem_ctx, &name);
364                         if (ret != 0) {
365                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", pnn));
366                                 return -1;
367                         }
368                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
369                                            mem_ctx, name, dbmap->dbs[db].persistent);
370                         if (ret != 0) {
371                                 DEBUG(0, (__location__ " Unable to create remote db:%s\n", name));
372                                 return -1;
373                         }
374                 }
375         }
376
377         return 0;
378 }
379
380
381 /*
382   ensure we are attached to any databases that anyone else is attached to
383  */
384 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
385                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
386 {
387         int i, j, db, ret;
388         struct ctdb_dbid_map *remote_dbmap;
389
390         /* verify that we have all database any other node has */
391         for (j=0; j<nodemap->num; j++) {
392                 /* we dont need to ourself ourselves */
393                 if (nodemap->nodes[j].pnn == pnn) {
394                         continue;
395                 }
396                 /* dont check nodes that are unavailable */
397                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
398                         continue;
399                 }
400
401                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
402                                          mem_ctx, &remote_dbmap);
403                 if (ret != 0) {
404                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
405                         return -1;
406                 }
407
408                 /* step through all databases on the remote node */
409                 for (db=0; db<remote_dbmap->num;db++) {
410                         const char *name;
411
412                         for (i=0;i<(*dbmap)->num;i++) {
413                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
414                                         break;
415                                 }
416                         }
417                         /* we already have this db locally */
418                         if (i!=(*dbmap)->num) {
419                                 continue;
420                         }
421                         /* ok so we need to create this database and
422                            rebuild dbmap
423                          */
424                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
425                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
426                         if (ret != 0) {
427                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", 
428                                           nodemap->nodes[j].pnn));
429                                 return -1;
430                         }
431                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
432                                            remote_dbmap->dbs[db].persistent);
433                         if (ret != 0) {
434                                 DEBUG(0, (__location__ " Unable to create local db:%s\n", name));
435                                 return -1;
436                         }
437                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
438                         if (ret != 0) {
439                                 DEBUG(0, (__location__ " Unable to reread dbmap on node %u\n", pnn));
440                                 return -1;
441                         }
442                 }
443         }
444
445         return 0;
446 }
447
448
449 /*
450   pull the remote database contents from one node into the recdb
451  */
452 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
453                                     struct tdb_wrap *recdb, uint32_t dbid)
454 {
455         int ret;
456         TDB_DATA outdata;
457         struct ctdb_control_pulldb_reply *reply;
458         struct ctdb_rec_data *rec;
459         int i;
460         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
461
462         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
463                                CONTROL_TIMEOUT(), &outdata);
464         if (ret != 0) {
465                 DEBUG(0,(__location__ " Unable to copy db from node %u\n", srcnode));
466                 talloc_free(tmp_ctx);
467                 return -1;
468         }
469
470         reply = (struct ctdb_control_pulldb_reply *)outdata.dptr;
471
472         if (outdata.dsize < offsetof(struct ctdb_control_pulldb_reply, data)) {
473                 DEBUG(0,(__location__ " invalid data in pulldb reply\n"));
474                 talloc_free(tmp_ctx);
475                 return -1;
476         }
477         
478         rec = (struct ctdb_rec_data *)&reply->data[0];
479         
480         for (i=0;
481              i<reply->count;
482              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
483                 TDB_DATA key, data;
484                 struct ctdb_ltdb_header *hdr;
485                 TDB_DATA existing;
486                 
487                 key.dptr = &rec->data[0];
488                 key.dsize = rec->keylen;
489                 data.dptr = &rec->data[key.dsize];
490                 data.dsize = rec->datalen;
491                 
492                 hdr = (struct ctdb_ltdb_header *)data.dptr;
493
494                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
495                         DEBUG(0,(__location__ " bad ltdb record\n"));
496                         talloc_free(tmp_ctx);
497                         return -1;
498                 }
499
500                 /* fetch the existing record, if any */
501                 existing = tdb_fetch(recdb->tdb, key);
502                 
503                 if (existing.dptr != NULL) {
504                         struct ctdb_ltdb_header header;
505                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
506                                 DEBUG(0,(__location__ " Bad record size %u from node %u\n", 
507                                          (unsigned)existing.dsize, srcnode));
508                                 free(existing.dptr);
509                                 talloc_free(tmp_ctx);
510                                 return -1;
511                         }
512                         header = *(struct ctdb_ltdb_header *)existing.dptr;
513                         free(existing.dptr);
514                         if (!(header.rsn < hdr->rsn ||
515                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
516                                 continue;
517                         }
518                 }
519                 
520                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
521                         DEBUG(0,(__location__ " Failed to store record\n"));
522                         talloc_free(tmp_ctx);
523                         return -1;                              
524                 }
525         }
526
527         talloc_free(tmp_ctx);
528
529         return 0;
530 }
531
532 /*
533   pull all the remote database contents into the recdb
534  */
535 static int pull_remote_database(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
536                                 struct tdb_wrap *recdb, uint32_t dbid)
537 {
538         int j;
539
540         /* pull all records from all other nodes across onto this node
541            (this merges based on rsn)
542         */
543         for (j=0; j<nodemap->num; j++) {
544                 /* dont merge from nodes that are unavailable */
545                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
546                         continue;
547                 }
548                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
549                         DEBUG(0,(__location__ " Failed to pull remote database from node %u\n", 
550                                  nodemap->nodes[j].pnn));
551                         return -1;
552                 }
553         }
554         
555         return 0;
556 }
557
558
559 /*
560   update flags on all active nodes
561  */
562 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
563 {
564         int i;
565         for (i=0;i<nodemap->num;i++) {
566                 struct ctdb_node_flag_change c;
567                 TDB_DATA data;
568
569                 c.pnn = nodemap->nodes[i].pnn;
570                 c.old_flags = nodemap->nodes[i].flags;
571                 c.new_flags = nodemap->nodes[i].flags;
572
573                 data.dptr = (uint8_t *)&c;
574                 data.dsize = sizeof(c);
575
576                 ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
577                                   CTDB_SRVID_NODE_FLAGS_CHANGED, data);
578
579         }
580         return 0;
581 }
582
583
584 /*
585   ensure all nodes have the same vnnmap we do
586  */
587 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
588                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
589 {
590         int j, ret;
591
592         /* push the new vnn map out to all the nodes */
593         for (j=0; j<nodemap->num; j++) {
594                 /* dont push to nodes that are unavailable */
595                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
596                         continue;
597                 }
598
599                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
600                 if (ret != 0) {
601                         DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
602                         return -1;
603                 }
604         }
605
606         return 0;
607 }
608
609
610 /*
611   handler for when the admin bans a node
612 */
613 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
614                         TDB_DATA data, void *private_data)
615 {
616         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
617         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
618         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
619
620         if (data.dsize != sizeof(*b)) {
621                 DEBUG(0,("Bad data in ban_handler\n"));
622                 talloc_free(mem_ctx);
623                 return;
624         }
625
626         if (b->pnn != ctdb->pnn) {
627                 DEBUG(0,("Got a ban request for pnn:%u but our pnn is %u. Ignoring ban request\n", b->pnn, ctdb->pnn));
628                 return;
629         }
630
631         DEBUG(0,("Node %u has been banned for %u seconds\n", 
632                  b->pnn, b->ban_time));
633
634         ctdb_ban_node(rec, b->pnn, b->ban_time);
635         talloc_free(mem_ctx);
636 }
637
638 /*
639   handler for when the admin unbans a node
640 */
641 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
642                           TDB_DATA data, void *private_data)
643 {
644         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
645         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
646         uint32_t pnn;
647
648         if (data.dsize != sizeof(uint32_t)) {
649                 DEBUG(0,("Bad data in unban_handler\n"));
650                 talloc_free(mem_ctx);
651                 return;
652         }
653         pnn = *(uint32_t *)data.dptr;
654
655         if (pnn != ctdb->pnn) {
656                 DEBUG(0,("Got an unban request for pnn:%u but our pnn is %u. Ignoring unban request\n", pnn, ctdb->pnn));
657                 return;
658         }
659
660         DEBUG(0,("Node %u has been unbanned.\n", pnn));
661         ctdb_unban_node(rec, pnn);
662         talloc_free(mem_ctx);
663 }
664
665
666 struct vacuum_info {
667         struct vacuum_info *next, *prev;
668         struct ctdb_recoverd *rec;
669         uint32_t srcnode;
670         struct ctdb_db_context *ctdb_db;
671         struct ctdb_control_pulldb_reply *recs;
672         struct ctdb_rec_data *r;
673 };
674
675 static void vacuum_fetch_next(struct vacuum_info *v);
676
677 /*
678   called when a vacuum fetch has completed - just free it and do the next one
679  */
680 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
681 {
682         struct vacuum_info *v = talloc_get_type(state->async.private, struct vacuum_info);
683         talloc_free(state);
684         vacuum_fetch_next(v);
685 }
686
687
688 /*
689   process the next element from the vacuum list
690 */
691 static void vacuum_fetch_next(struct vacuum_info *v)
692 {
693         struct ctdb_call call;
694         struct ctdb_rec_data *r;
695
696         while (v->recs->count) {
697                 struct ctdb_client_call_state *state;
698                 TDB_DATA data;
699                 struct ctdb_ltdb_header *hdr;
700
701                 ZERO_STRUCT(call);
702                 call.call_id = CTDB_NULL_FUNC;
703                 call.flags = CTDB_IMMEDIATE_MIGRATION;
704
705                 r = v->r;
706                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
707                 v->recs->count--;
708
709                 call.key.dptr = &r->data[0];
710                 call.key.dsize = r->keylen;
711
712                 /* ensure we don't block this daemon - just skip a record if we can't get
713                    the chainlock */
714                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
715                         continue;
716                 }
717
718                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
719                 if (data.dptr == NULL) {
720                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
721                         continue;
722                 }
723
724                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
725                         free(data.dptr);
726                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
727                         continue;
728                 }
729                 
730                 hdr = (struct ctdb_ltdb_header *)data.dptr;
731                 if (hdr->dmaster == v->rec->ctdb->pnn) {
732                         /* its already local */
733                         free(data.dptr);
734                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
735                         continue;
736                 }
737
738                 free(data.dptr);
739
740                 state = ctdb_call_send(v->ctdb_db, &call);
741                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
742                 if (state == NULL) {
743                         DEBUG(0,(__location__ " Failed to setup vacuum fetch call\n"));
744                         talloc_free(v);
745                         return;
746                 }
747                 state->async.fn = vacuum_fetch_callback;
748                 state->async.private = v;
749                 return;
750         }
751
752         talloc_free(v);
753 }
754
755
756 /*
757   destroy a vacuum info structure
758  */
759 static int vacuum_info_destructor(struct vacuum_info *v)
760 {
761         DLIST_REMOVE(v->rec->vacuum_info, v);
762         return 0;
763 }
764
765
766 /*
767   handler for vacuum fetch
768 */
769 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
770                                  TDB_DATA data, void *private_data)
771 {
772         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
773         struct ctdb_control_pulldb_reply *recs;
774         int ret, i;
775         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
776         const char *name;
777         struct ctdb_dbid_map *dbmap=NULL;
778         bool persistent = false;
779         struct ctdb_db_context *ctdb_db;
780         struct ctdb_rec_data *r;
781         uint32_t srcnode;
782         struct vacuum_info *v;
783
784         recs = (struct ctdb_control_pulldb_reply *)data.dptr;
785         r = (struct ctdb_rec_data *)&recs->data[0];
786
787         if (recs->count == 0) {
788                 return;
789         }
790
791         srcnode = r->reqid;
792
793         for (v=rec->vacuum_info;v;v=v->next) {
794                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
795                         /* we're already working on records from this node */
796                         return;
797                 }
798         }
799
800         /* work out if the database is persistent */
801         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
802         if (ret != 0) {
803                 DEBUG(0, (__location__ " Unable to get dbids from local node\n"));
804                 talloc_free(tmp_ctx);
805                 return;
806         }
807
808         for (i=0;i<dbmap->num;i++) {
809                 if (dbmap->dbs[i].dbid == recs->db_id) {
810                         persistent = dbmap->dbs[i].persistent;
811                         break;
812                 }
813         }
814         if (i == dbmap->num) {
815                 DEBUG(0, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
816                 talloc_free(tmp_ctx);
817                 return;         
818         }
819
820         /* find the name of this database */
821         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
822                 DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
823                 talloc_free(tmp_ctx);
824                 return;
825         }
826
827         /* attach to it */
828         ctdb_db = ctdb_attach(ctdb, name, persistent);
829         if (ctdb_db == NULL) {
830                 DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
831                 talloc_free(tmp_ctx);
832                 return;
833         }
834
835         v = talloc_zero(rec, struct vacuum_info);
836         if (v == NULL) {
837                 DEBUG(0,(__location__ " Out of memory\n"));
838                 return;
839         }
840
841         v->rec = rec;
842         v->srcnode = srcnode;
843         v->ctdb_db = ctdb_db;
844         v->recs = talloc_memdup(v, recs, data.dsize);
845         if (v->recs == NULL) {
846                 DEBUG(0,(__location__ " Out of memory\n"));
847                 talloc_free(v);
848                 return;         
849         }
850         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
851
852         DLIST_ADD(rec->vacuum_info, v);
853
854         talloc_set_destructor(v, vacuum_info_destructor);
855
856         vacuum_fetch_next(v);
857 }
858
859
860 /*
861   called when ctdb_wait_timeout should finish
862  */
863 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
864                               struct timeval yt, void *p)
865 {
866         uint32_t *timed_out = (uint32_t *)p;
867         (*timed_out) = 1;
868 }
869
870 /*
871   wait for a given number of seconds
872  */
873 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
874 {
875         uint32_t timed_out = 0;
876         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
877         while (!timed_out) {
878                 event_loop_once(ctdb->ev);
879         }
880 }
881
882 /*
883   called when an election times out (ends)
884  */
885 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
886                                   struct timeval t, void *p)
887 {
888         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
889         rec->election_timeout = NULL;
890 }
891
892
893 /*
894   wait for an election to finish. It finished election_timeout seconds after
895   the last election packet is received
896  */
897 static void ctdb_wait_election(struct ctdb_recoverd *rec)
898 {
899         struct ctdb_context *ctdb = rec->ctdb;
900         while (rec->election_timeout) {
901                 event_loop_once(ctdb->ev);
902         }
903 }
904
905 /*
906   remember the trouble maker
907  */
908 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
909 {
910         struct ctdb_context *ctdb = rec->ctdb;
911
912         if (rec->last_culprit != culprit ||
913             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
914                 DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
915                 /* either a new node is the culprit, or we've decided to forgive them */
916                 rec->last_culprit = culprit;
917                 rec->first_recover_time = timeval_current();
918                 rec->culprit_counter = 0;
919         }
920         rec->culprit_counter++;
921 }
922
923 /*
924   Update our local flags from all remote connected nodes. 
925   This is only run when we are or we belive we are the recovery master
926  */
927 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
928 {
929         int j;
930         struct ctdb_context *ctdb = rec->ctdb;
931         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
932
933         /* get the nodemap for all active remote nodes and verify
934            they are the same as for this node
935          */
936         for (j=0; j<nodemap->num; j++) {
937                 struct ctdb_node_map *remote_nodemap=NULL;
938                 int ret;
939
940                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
941                         continue;
942                 }
943                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
944                         continue;
945                 }
946
947                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
948                                            mem_ctx, &remote_nodemap);
949                 if (ret != 0) {
950                         DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
951                                   nodemap->nodes[j].pnn));
952                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
953                         talloc_free(mem_ctx);
954                         return MONITOR_FAILED;
955                 }
956                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
957                         struct ctdb_node_flag_change c;
958                         TDB_DATA data;
959
960                         /* We should tell our daemon about this so it
961                            updates its flags or else we will log the same 
962                            message again in the next iteration of recovery.
963                            Since we are the recovery master we can just as
964                            well update the flags on all nodes.
965                         */
966                         c.pnn = nodemap->nodes[j].pnn;
967                         c.old_flags = nodemap->nodes[j].flags;
968                         c.new_flags = remote_nodemap->nodes[j].flags;
969
970                         data.dptr = (uint8_t *)&c;
971                         data.dsize = sizeof(c);
972
973                         ctdb_send_message(ctdb, ctdb->pnn,
974                                         CTDB_SRVID_NODE_FLAGS_CHANGED, 
975                                         data);
976
977                         /* Update our local copy of the flags in the recovery
978                            daemon.
979                         */
980                         DEBUG(0,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
981                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
982                                  nodemap->nodes[j].flags));
983                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
984
985                         /* If the BANNED flag has changed for the node
986                            this is a good reason to do a new election.
987                          */
988                         if ((c.old_flags ^ c.new_flags) & NODE_FLAGS_BANNED) {
989                                 DEBUG(0,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
990                                  nodemap->nodes[j].pnn, c.new_flags,
991                                  c.old_flags));
992                                 talloc_free(mem_ctx);
993                                 return MONITOR_ELECTION_NEEDED;
994                         }
995
996                 }
997                 talloc_free(remote_nodemap);
998         }
999         talloc_free(mem_ctx);
1000         return MONITOR_OK;
1001 }
1002
1003
1004 /* Create a new random generation ip. 
1005    The generation id can not be the INVALID_GENERATION id
1006 */
1007 static uint32_t new_generation(void)
1008 {
1009         uint32_t generation;
1010
1011         while (1) {
1012                 generation = random();
1013
1014                 if (generation != INVALID_GENERATION) {
1015                         break;
1016                 }
1017         }
1018
1019         return generation;
1020 }
1021
1022
1023 /*
1024   create a temporary working database
1025  */
1026 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1027 {
1028         char *name;
1029         struct tdb_wrap *recdb;
1030
1031         /* open up the temporary recovery database */
1032         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
1033         if (name == NULL) {
1034                 return NULL;
1035         }
1036         unlink(name);
1037         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1038                               TDB_NOLOCK, O_RDWR|O_CREAT|O_EXCL, 0600);
1039         if (recdb == NULL) {
1040                 DEBUG(0,(__location__ " Failed to create temp recovery database '%s'\n", name));
1041         }
1042
1043         talloc_free(name);
1044
1045         return recdb;
1046 }
1047
1048
1049 /* 
1050    a traverse function for pulling all relevent records from recdb
1051  */
1052 struct recdb_data {
1053         struct ctdb_context *ctdb;
1054         struct ctdb_control_pulldb_reply *recdata;
1055         uint32_t len;
1056         bool failed;
1057 };
1058
1059 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1060 {
1061         struct recdb_data *params = (struct recdb_data *)p;
1062         struct ctdb_rec_data *rec;
1063         struct ctdb_ltdb_header *hdr;
1064
1065         /* skip empty records */
1066         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1067                 return 0;
1068         }
1069
1070         /* update the dmaster field to point to us */
1071         hdr = (struct ctdb_ltdb_header *)data.dptr;
1072         hdr->dmaster = params->ctdb->pnn;
1073
1074         /* add the record to the blob ready to send to the nodes */
1075         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1076         if (rec == NULL) {
1077                 params->failed = true;
1078                 return -1;
1079         }
1080         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1081         if (params->recdata == NULL) {
1082                 DEBUG(0,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1083                          rec->length + params->len, params->recdata->count));
1084                 params->failed = true;
1085                 return -1;
1086         }
1087         params->recdata->count++;
1088         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1089         params->len += rec->length;
1090         talloc_free(rec);
1091
1092         return 0;
1093 }
1094
1095 /*
1096   push the recdb database out to all nodes
1097  */
1098 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1099                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1100 {
1101         struct recdb_data params;
1102         struct ctdb_control_pulldb_reply *recdata;
1103         TDB_DATA outdata;
1104         TALLOC_CTX *tmp_ctx;
1105
1106         tmp_ctx = talloc_new(ctdb);
1107         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1108
1109         recdata = talloc_zero(recdb, struct ctdb_control_pulldb_reply);
1110         CTDB_NO_MEMORY(ctdb, recdata);
1111
1112         recdata->db_id = dbid;
1113
1114         params.ctdb = ctdb;
1115         params.recdata = recdata;
1116         params.len = offsetof(struct ctdb_control_pulldb_reply, data);
1117         params.failed = false;
1118
1119         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1120                 DEBUG(0,(__location__ " Failed to traverse recdb database\n"));
1121                 talloc_free(params.recdata);
1122                 talloc_free(tmp_ctx);
1123                 return -1;
1124         }
1125
1126         if (params.failed) {
1127                 DEBUG(0,(__location__ " Failed to traverse recdb database\n"));
1128                 talloc_free(params.recdata);
1129                 talloc_free(tmp_ctx);
1130                 return -1;              
1131         }
1132
1133         recdata = params.recdata;
1134
1135         outdata.dptr = (void *)recdata;
1136         outdata.dsize = params.len;
1137
1138         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1139                         list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
1140                         CONTROL_TIMEOUT(), false, outdata) != 0) {
1141                 DEBUG(0,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1142                 talloc_free(recdata);
1143                 talloc_free(tmp_ctx);
1144                 return -1;
1145         }
1146
1147         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1148                   dbid, recdata->count));
1149
1150         talloc_free(recdata);
1151         talloc_free(tmp_ctx);
1152
1153         return 0;
1154 }
1155
1156
1157 /*
1158   go through a full recovery on one database 
1159  */
1160 static int recover_database(struct ctdb_recoverd *rec, 
1161                             TALLOC_CTX *mem_ctx,
1162                             uint32_t dbid,
1163                             uint32_t pnn, 
1164                             struct ctdb_node_map *nodemap,
1165                             uint32_t transaction_id)
1166 {
1167         struct tdb_wrap *recdb;
1168         int ret;
1169         struct ctdb_context *ctdb = rec->ctdb;
1170         TDB_DATA data;
1171         struct ctdb_control_wipe_database w;
1172
1173         recdb = create_recdb(ctdb, mem_ctx);
1174         if (recdb == NULL) {
1175                 return -1;
1176         }
1177
1178         /* pull all remote databases onto the recdb */
1179         ret = pull_remote_database(ctdb, nodemap, recdb, dbid);
1180         if (ret != 0) {
1181                 DEBUG(0, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1182                 return -1;
1183         }
1184
1185         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1186
1187         /* wipe all the remote databases. This is safe as we are in a transaction */
1188         w.db_id = dbid;
1189         w.transaction_id = transaction_id;
1190
1191         data.dptr = (void *)&w;
1192         data.dsize = sizeof(w);
1193
1194         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1195                         list_of_active_nodes(ctdb, nodemap, recdb, true),
1196                         CONTROL_TIMEOUT(), false, data) != 0) {
1197                 DEBUG(0, (__location__ " Unable to wipe database. Recovery failed.\n"));
1198                 talloc_free(recdb);
1199                 return -1;
1200         }
1201         
1202         /* push out the correct database. This sets the dmaster and skips 
1203            the empty records */
1204         ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
1205         if (ret != 0) {
1206                 talloc_free(recdb);
1207                 return -1;
1208         }
1209
1210         /* all done with this database */
1211         talloc_free(recdb);
1212
1213         return 0;
1214 }
1215
1216                 
1217 /*
1218   we are the recmaster, and recovery is needed - start a recovery run
1219  */
1220 static int do_recovery(struct ctdb_recoverd *rec, 
1221                        TALLOC_CTX *mem_ctx, uint32_t pnn, uint32_t num_active,
1222                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
1223                        uint32_t culprit)
1224 {
1225         struct ctdb_context *ctdb = rec->ctdb;
1226         int i, j, ret;
1227         uint32_t generation;
1228         struct ctdb_dbid_map *dbmap;
1229         TDB_DATA data;
1230
1231         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1232
1233         /* if recovery fails, force it again */
1234         rec->need_recovery = true;
1235
1236         ctdb_set_culprit(rec, culprit);
1237
1238         if (rec->culprit_counter > 2*nodemap->num) {
1239                 DEBUG(0,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
1240                          culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
1241                          ctdb->tunable.recovery_ban_period));
1242                 ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
1243         }
1244
1245         if (!ctdb_recovery_lock(ctdb, true)) {
1246                 ctdb_set_culprit(rec, pnn);
1247                 DEBUG(0,("Unable to get recovery lock - aborting recovery\n"));
1248                 return -1;
1249         }
1250
1251         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
1252
1253         /* get a list of all databases */
1254         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1255         if (ret != 0) {
1256                 DEBUG(0, (__location__ " Unable to get dbids from node :%u\n", pnn));
1257                 return -1;
1258         }
1259
1260         /* we do the db creation before we set the recovery mode, so the freeze happens
1261            on all databases we will be dealing with. */
1262
1263         /* verify that we have all the databases any other node has */
1264         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1265         if (ret != 0) {
1266                 DEBUG(0, (__location__ " Unable to create missing local databases\n"));
1267                 return -1;
1268         }
1269
1270         /* verify that all other nodes have all our databases */
1271         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1272         if (ret != 0) {
1273                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
1274                 return -1;
1275         }
1276
1277         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1278
1279
1280         /* set recovery mode to active on all nodes */
1281         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1282         if (ret!=0) {
1283                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
1284                 return -1;
1285         }
1286
1287         /* execute the "startrecovery" event script on all nodes */
1288         ret = run_startrecovery_eventscript(ctdb, nodemap);
1289         if (ret!=0) {
1290                 DEBUG(0, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1291                 return -1;
1292         }
1293
1294         /* pick a new generation number */
1295         generation = new_generation();
1296
1297         /* change the vnnmap on this node to use the new generation 
1298            number but not on any other nodes.
1299            this guarantees that if we abort the recovery prematurely
1300            for some reason (a node stops responding?)
1301            that we can just return immediately and we will reenter
1302            recovery shortly again.
1303            I.e. we deliberately leave the cluster with an inconsistent
1304            generation id to allow us to abort recovery at any stage and
1305            just restart it from scratch.
1306          */
1307         vnnmap->generation = generation;
1308         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1309         if (ret != 0) {
1310                 DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1311                 return -1;
1312         }
1313
1314         data.dptr = (void *)&generation;
1315         data.dsize = sizeof(uint32_t);
1316
1317         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1318                         list_of_active_nodes(ctdb, nodemap, mem_ctx, true),
1319                         CONTROL_TIMEOUT(), false, data) != 0) {
1320                 DEBUG(0, (__location__ " Unable to start transactions. Recovery failed.\n"));
1321                 return -1;
1322         }
1323
1324         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1325
1326         for (i=0;i<dbmap->num;i++) {
1327                 if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
1328                         DEBUG(0, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1329                         return -1;
1330                 }
1331         }
1332
1333         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1334
1335         /* commit all the changes */
1336         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1337                         list_of_active_nodes(ctdb, nodemap, mem_ctx, true),
1338                         CONTROL_TIMEOUT(), false, data) != 0) {
1339                 DEBUG(0, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1340                 return -1;
1341         }
1342
1343         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1344         
1345
1346         /* build a new vnn map with all the currently active and
1347            unbanned nodes */
1348         generation = new_generation();
1349         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1350         CTDB_NO_MEMORY(ctdb, vnnmap);
1351         vnnmap->generation = generation;
1352         vnnmap->size = num_active;
1353         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1354         for (i=j=0;i<nodemap->num;i++) {
1355                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1356                         vnnmap->map[j++] = nodemap->nodes[i].pnn;
1357                 }
1358         }
1359
1360         /* update to the new vnnmap on all nodes */
1361         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1362         if (ret != 0) {
1363                 DEBUG(0, (__location__ " Unable to update vnnmap on all nodes\n"));
1364                 return -1;
1365         }
1366
1367         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1368
1369         /* update recmaster to point to us for all nodes */
1370         ret = set_recovery_master(ctdb, nodemap, pnn);
1371         if (ret!=0) {
1372                 DEBUG(0, (__location__ " Unable to set recovery master\n"));
1373                 return -1;
1374         }
1375
1376         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1377
1378         /*
1379           update all nodes to have the same flags that we have
1380          */
1381         ret = update_flags_on_all_nodes(ctdb, nodemap);
1382         if (ret != 0) {
1383                 DEBUG(0, (__location__ " Unable to update flags on all nodes\n"));
1384                 return -1;
1385         }
1386         
1387         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1388
1389         /*
1390           if enabled, tell nodes to takeover their public IPs
1391          */
1392         if (ctdb->vnn) {
1393                 rec->need_takeover_run = false;
1394                 ret = ctdb_takeover_run(ctdb, nodemap);
1395                 if (ret != 0) {
1396                         DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
1397                         return -1;
1398                 }
1399                 DEBUG(DEBUG_INFO, (__location__ " Recovery - done takeover\n"));
1400         }
1401
1402         /* execute the "recovered" event script on all nodes */
1403         ret = run_recovered_eventscript(ctdb, nodemap);
1404         if (ret!=0) {
1405                 DEBUG(0, (__location__ " Unable to run the 'recovered' event on cluster\n"));
1406                 return -1;
1407         }
1408
1409         /* disable recovery mode */
1410         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
1411         if (ret!=0) {
1412                 DEBUG(0, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1413                 return -1;
1414         }
1415
1416         /* send a message to all clients telling them that the cluster 
1417            has been reconfigured */
1418         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1419
1420         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1421
1422         rec->need_recovery = false;
1423
1424         /* We just finished a recovery successfully. 
1425            We now wait for rerecovery_timeout before we allow 
1426            another recovery to take place.
1427         */
1428         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1429         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1430         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1431
1432         return 0;
1433 }
1434
1435
1436 /*
1437   elections are won by first checking the number of connected nodes, then
1438   the priority time, then the pnn
1439  */
1440 struct election_message {
1441         uint32_t num_connected;
1442         struct timeval priority_time;
1443         uint32_t pnn;
1444         uint32_t node_flags;
1445 };
1446
1447 /*
1448   form this nodes election data
1449  */
1450 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1451 {
1452         int ret, i;
1453         struct ctdb_node_map *nodemap;
1454         struct ctdb_context *ctdb = rec->ctdb;
1455
1456         ZERO_STRUCTP(em);
1457
1458         em->pnn = rec->ctdb->pnn;
1459         em->priority_time = rec->priority_time;
1460         em->node_flags = rec->node_flags;
1461
1462         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1463         if (ret != 0) {
1464                 DEBUG(0,(__location__ " unable to get election data\n"));
1465                 return;
1466         }
1467
1468         for (i=0;i<nodemap->num;i++) {
1469                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1470                         em->num_connected++;
1471                 }
1472         }
1473         talloc_free(nodemap);
1474 }
1475
1476 /*
1477   see if the given election data wins
1478  */
1479 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1480 {
1481         struct election_message myem;
1482         int cmp = 0;
1483
1484         ctdb_election_data(rec, &myem);
1485
1486         /* we cant win if we are banned */
1487         if (rec->node_flags & NODE_FLAGS_BANNED) {
1488                 return false;
1489         }       
1490
1491         /* we will automatically win if the other node is banned */
1492         if (em->node_flags & NODE_FLAGS_BANNED) {
1493                 return true;
1494         }
1495
1496         /* try to use the most connected node */
1497         if (cmp == 0) {
1498                 cmp = (int)myem.num_connected - (int)em->num_connected;
1499         }
1500
1501         /* then the longest running node */
1502         if (cmp == 0) {
1503                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1504         }
1505
1506         if (cmp == 0) {
1507                 cmp = (int)myem.pnn - (int)em->pnn;
1508         }
1509
1510         return cmp > 0;
1511 }
1512
1513 /*
1514   send out an election request
1515  */
1516 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
1517 {
1518         int ret;
1519         TDB_DATA election_data;
1520         struct election_message emsg;
1521         uint64_t srvid;
1522         struct ctdb_context *ctdb = rec->ctdb;
1523
1524         srvid = CTDB_SRVID_RECOVERY;
1525
1526         ctdb_election_data(rec, &emsg);
1527
1528         election_data.dsize = sizeof(struct election_message);
1529         election_data.dptr  = (unsigned char *)&emsg;
1530
1531
1532         /* first we assume we will win the election and set 
1533            recoverymaster to be ourself on the current node
1534          */
1535         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1536         if (ret != 0) {
1537                 DEBUG(0, (__location__ " failed to send recmaster election request\n"));
1538                 return -1;
1539         }
1540
1541
1542         /* send an election message to all active nodes */
1543         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1544
1545         return 0;
1546 }
1547
1548 /*
1549   this function will unban all nodes in the cluster
1550 */
1551 static void unban_all_nodes(struct ctdb_context *ctdb)
1552 {
1553         int ret, i;
1554         struct ctdb_node_map *nodemap;
1555         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1556         
1557         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1558         if (ret != 0) {
1559                 DEBUG(0,(__location__ " failed to get nodemap to unban all nodes\n"));
1560                 return;
1561         }
1562
1563         for (i=0;i<nodemap->num;i++) {
1564                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1565                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1566                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1567                 }
1568         }
1569
1570         talloc_free(tmp_ctx);
1571 }
1572
1573
1574 /*
1575   we think we are winning the election - send a broadcast election request
1576  */
1577 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1578 {
1579         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1580         int ret;
1581
1582         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
1583         if (ret != 0) {
1584                 DEBUG(0,("Failed to send election request!\n"));
1585         }
1586
1587         talloc_free(rec->send_election_te);
1588         rec->send_election_te = NULL;
1589 }
1590
1591 /*
1592   handler for recovery master elections
1593 */
1594 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1595                              TDB_DATA data, void *private_data)
1596 {
1597         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1598         int ret;
1599         struct election_message *em = (struct election_message *)data.dptr;
1600         TALLOC_CTX *mem_ctx;
1601
1602         /* we got an election packet - update the timeout for the election */
1603         talloc_free(rec->election_timeout);
1604         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1605                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1606                                                 ctdb_election_timeout, rec);
1607
1608         mem_ctx = talloc_new(ctdb);
1609
1610         /* someone called an election. check their election data
1611            and if we disagree and we would rather be the elected node, 
1612            send a new election message to all other nodes
1613          */
1614         if (ctdb_election_win(rec, em)) {
1615                 if (!rec->send_election_te) {
1616                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1617                                                                 timeval_current_ofs(0, 500000),
1618                                                                 election_send_request, rec);
1619                 }
1620                 talloc_free(mem_ctx);
1621                 /*unban_all_nodes(ctdb);*/
1622                 return;
1623         }
1624         
1625         /* we didn't win */
1626         talloc_free(rec->send_election_te);
1627         rec->send_election_te = NULL;
1628
1629         /* release the recmaster lock */
1630         if (em->pnn != ctdb->pnn &&
1631             ctdb->recovery_lock_fd != -1) {
1632                 close(ctdb->recovery_lock_fd);
1633                 ctdb->recovery_lock_fd = -1;
1634                 unban_all_nodes(ctdb);
1635         }
1636
1637         /* ok, let that guy become recmaster then */
1638         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1639         if (ret != 0) {
1640                 DEBUG(0, (__location__ " failed to send recmaster election request"));
1641                 talloc_free(mem_ctx);
1642                 return;
1643         }
1644
1645         /* release any bans */
1646         rec->last_culprit = (uint32_t)-1;
1647         talloc_free(rec->banned_nodes);
1648         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1649         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1650
1651         talloc_free(mem_ctx);
1652         return;
1653 }
1654
1655
1656 /*
1657   force the start of the election process
1658  */
1659 static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn, 
1660                            struct ctdb_node_map *nodemap)
1661 {
1662         int ret;
1663         struct ctdb_context *ctdb = rec->ctdb;
1664
1665         /* set all nodes to recovery mode to stop all internode traffic */
1666         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1667         if (ret!=0) {
1668                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
1669                 return;
1670         }
1671
1672         talloc_free(rec->election_timeout);
1673         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1674                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1675                                                 ctdb_election_timeout, rec);
1676
1677         ret = send_election_request(rec, pnn);
1678         if (ret!=0) {
1679                 DEBUG(0, (__location__ " failed to initiate recmaster election"));
1680                 return;
1681         }
1682
1683         /* wait for a few seconds to collect all responses */
1684         ctdb_wait_election(rec);
1685 }
1686
1687
1688
1689 /*
1690   handler for when a node changes its flags
1691 */
1692 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1693                             TDB_DATA data, void *private_data)
1694 {
1695         int ret;
1696         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1697         struct ctdb_node_map *nodemap=NULL;
1698         TALLOC_CTX *tmp_ctx;
1699         uint32_t changed_flags;
1700         int i;
1701         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1702
1703         if (data.dsize != sizeof(*c)) {
1704                 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1705                 return;
1706         }
1707
1708         tmp_ctx = talloc_new(ctdb);
1709         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1710
1711         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1712         if (ret != 0) {
1713                 DEBUG(0,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1714                 talloc_free(tmp_ctx);
1715                 return;         
1716         }
1717
1718
1719         for (i=0;i<nodemap->num;i++) {
1720                 if (nodemap->nodes[i].pnn == c->pnn) break;
1721         }
1722
1723         if (i == nodemap->num) {
1724                 DEBUG(0,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1725                 talloc_free(tmp_ctx);
1726                 return;
1727         }
1728
1729         changed_flags = c->old_flags ^ c->new_flags;
1730
1731         /* Dont let messages from remote nodes change the DISCONNECTED flag. 
1732            This flag is handled locally based on whether the local node
1733            can communicate with the node or not.
1734         */
1735         c->new_flags &= ~NODE_FLAGS_DISCONNECTED;
1736         if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) {
1737                 c->new_flags |= NODE_FLAGS_DISCONNECTED;
1738         }
1739
1740         if (nodemap->nodes[i].flags != c->new_flags) {
1741                 DEBUG(0,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1742         }
1743
1744         nodemap->nodes[i].flags = c->new_flags;
1745
1746         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1747                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1748
1749         if (ret == 0) {
1750                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1751                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1752         }
1753         
1754         if (ret == 0 &&
1755             ctdb->recovery_master == ctdb->pnn &&
1756             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
1757             ctdb->vnn) {
1758                 /* Only do the takeover run if the perm disabled or unhealthy
1759                    flags changed since these will cause an ip failover but not
1760                    a recovery.
1761                    If the node became disconnected or banned this will also
1762                    lead to an ip address failover but that is handled 
1763                    during recovery
1764                 */
1765                 if (changed_flags & NODE_FLAGS_DISABLED) {
1766                         rec->need_takeover_run = true;
1767                 }
1768         }
1769
1770         talloc_free(tmp_ctx);
1771 }
1772
1773
1774
1775 struct verify_recmode_normal_data {
1776         uint32_t count;
1777         enum monitor_result status;
1778 };
1779
1780 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1781 {
1782         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
1783
1784
1785         /* one more node has responded with recmode data*/
1786         rmdata->count--;
1787
1788         /* if we failed to get the recmode, then return an error and let
1789            the main loop try again.
1790         */
1791         if (state->state != CTDB_CONTROL_DONE) {
1792                 if (rmdata->status == MONITOR_OK) {
1793                         rmdata->status = MONITOR_FAILED;
1794                 }
1795                 return;
1796         }
1797
1798         /* if we got a response, then the recmode will be stored in the
1799            status field
1800         */
1801         if (state->status != CTDB_RECOVERY_NORMAL) {
1802                 DEBUG(0, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
1803                 rmdata->status = MONITOR_RECOVERY_NEEDED;
1804         }
1805
1806         return;
1807 }
1808
1809
1810 /* verify that all nodes are in normal recovery mode */
1811 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
1812 {
1813         struct verify_recmode_normal_data *rmdata;
1814         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1815         struct ctdb_client_control_state *state;
1816         enum monitor_result status;
1817         int j;
1818         
1819         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
1820         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1821         rmdata->count  = 0;
1822         rmdata->status = MONITOR_OK;
1823
1824         /* loop over all active nodes and send an async getrecmode call to 
1825            them*/
1826         for (j=0; j<nodemap->num; j++) {
1827                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1828                         continue;
1829                 }
1830                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
1831                                         CONTROL_TIMEOUT(), 
1832                                         nodemap->nodes[j].pnn);
1833                 if (state == NULL) {
1834                         /* we failed to send the control, treat this as 
1835                            an error and try again next iteration
1836                         */                      
1837                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
1838                         talloc_free(mem_ctx);
1839                         return MONITOR_FAILED;
1840                 }
1841
1842                 /* set up the callback functions */
1843                 state->async.fn = verify_recmode_normal_callback;
1844                 state->async.private_data = rmdata;
1845
1846                 /* one more control to wait for to complete */
1847                 rmdata->count++;
1848         }
1849
1850
1851         /* now wait for up to the maximum number of seconds allowed
1852            or until all nodes we expect a response from has replied
1853         */
1854         while (rmdata->count > 0) {
1855                 event_loop_once(ctdb->ev);
1856         }
1857
1858         status = rmdata->status;
1859         talloc_free(mem_ctx);
1860         return status;
1861 }
1862
1863
1864 struct verify_recmaster_data {
1865         uint32_t count;
1866         uint32_t pnn;
1867         enum monitor_result status;
1868 };
1869
1870 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
1871 {
1872         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
1873
1874
1875         /* one more node has responded with recmaster data*/
1876         rmdata->count--;
1877
1878         /* if we failed to get the recmaster, then return an error and let
1879            the main loop try again.
1880         */
1881         if (state->state != CTDB_CONTROL_DONE) {
1882                 if (rmdata->status == MONITOR_OK) {
1883                         rmdata->status = MONITOR_FAILED;
1884                 }
1885                 return;
1886         }
1887
1888         /* if we got a response, then the recmaster will be stored in the
1889            status field
1890         */
1891         if (state->status != rmdata->pnn) {
1892                 DEBUG(0,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
1893                 rmdata->status = MONITOR_ELECTION_NEEDED;
1894         }
1895
1896         return;
1897 }
1898
1899
1900 /* verify that all nodes agree that we are the recmaster */
1901 static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
1902 {
1903         struct verify_recmaster_data *rmdata;
1904         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1905         struct ctdb_client_control_state *state;
1906         enum monitor_result status;
1907         int j;
1908         
1909         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
1910         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1911         rmdata->count  = 0;
1912         rmdata->pnn    = pnn;
1913         rmdata->status = MONITOR_OK;
1914
1915         /* loop over all active nodes and send an async getrecmaster call to 
1916            them*/
1917         for (j=0; j<nodemap->num; j++) {
1918                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1919                         continue;
1920                 }
1921                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
1922                                         CONTROL_TIMEOUT(),
1923                                         nodemap->nodes[j].pnn);
1924                 if (state == NULL) {
1925                         /* we failed to send the control, treat this as 
1926                            an error and try again next iteration
1927                         */                      
1928                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
1929                         talloc_free(mem_ctx);
1930                         return MONITOR_FAILED;
1931                 }
1932
1933                 /* set up the callback functions */
1934                 state->async.fn = verify_recmaster_callback;
1935                 state->async.private_data = rmdata;
1936
1937                 /* one more control to wait for to complete */
1938                 rmdata->count++;
1939         }
1940
1941
1942         /* now wait for up to the maximum number of seconds allowed
1943            or until all nodes we expect a response from has replied
1944         */
1945         while (rmdata->count > 0) {
1946                 event_loop_once(ctdb->ev);
1947         }
1948
1949         status = rmdata->status;
1950         talloc_free(mem_ctx);
1951         return status;
1952 }
1953
1954
1955 /*
1956   the main monitoring loop
1957  */
1958 static void monitor_cluster(struct ctdb_context *ctdb)
1959 {
1960         uint32_t pnn, num_active, recmaster;
1961         TALLOC_CTX *mem_ctx=NULL;
1962         struct ctdb_node_map *nodemap=NULL;
1963         struct ctdb_node_map *remote_nodemap=NULL;
1964         struct ctdb_vnn_map *vnnmap=NULL;
1965         struct ctdb_vnn_map *remote_vnnmap=NULL;
1966         int i, j, ret;
1967         struct ctdb_recoverd *rec;
1968         struct ctdb_all_public_ips *ips;
1969         char c;
1970
1971         DEBUG(0,("monitor_cluster starting\n"));
1972
1973         rec = talloc_zero(ctdb, struct ctdb_recoverd);
1974         CTDB_NO_MEMORY_FATAL(ctdb, rec);
1975
1976         rec->ctdb = ctdb;
1977         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1978         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1979
1980         rec->priority_time = timeval_current();
1981
1982         /* register a message port for recovery elections */
1983         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
1984
1985         /* and one for when nodes are disabled/enabled */
1986         ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
1987
1988         /* and one for when nodes are banned */
1989         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
1990
1991         /* and one for when nodes are unbanned */
1992         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
1993
1994         /* register a message port for vacuum fetch */
1995         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
1996         
1997 again:
1998         if (mem_ctx) {
1999                 talloc_free(mem_ctx);
2000                 mem_ctx = NULL;
2001         }
2002         mem_ctx = talloc_new(ctdb);
2003         if (!mem_ctx) {
2004                 DEBUG(0,("Failed to create temporary context\n"));
2005                 exit(-1);
2006         }
2007
2008         /* we only check for recovery once every second */
2009         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2010
2011         /* verify that the main daemon is still running */
2012         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2013                 DEBUG(0,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2014                 exit(-1);
2015         }
2016
2017         if (rec->election_timeout) {
2018                 /* an election is in progress */
2019                 goto again;
2020         }
2021
2022
2023         /* We must check if we need to ban a node here but we want to do this
2024            as early as possible so we dont wait until we have pulled the node
2025            map from the local node. thats why we have the hardcoded value 20
2026         */
2027         if (rec->culprit_counter > 20) {
2028                 DEBUG(0,("Node %u has caused %u failures in %.0f seconds - banning it for %u seconds\n",
2029                          rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
2030                          ctdb->tunable.recovery_ban_period));
2031                 ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
2032         }
2033
2034         /* get relevant tunables */
2035         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2036         if (ret != 0) {
2037                 DEBUG(0,("Failed to get tunables - retrying\n"));
2038                 goto again;
2039         }
2040
2041         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2042         if (pnn == (uint32_t)-1) {
2043                 DEBUG(0,("Failed to get local pnn - retrying\n"));
2044                 goto again;
2045         }
2046
2047         /* get the vnnmap */
2048         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2049         if (ret != 0) {
2050                 DEBUG(0, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2051                 goto again;
2052         }
2053
2054
2055         /* get number of nodes */
2056         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &nodemap);
2057         if (ret != 0) {
2058                 DEBUG(0, (__location__ " Unable to get nodemap from node %u\n", pnn));
2059                 goto again;
2060         }
2061
2062         /* check which node is the recovery master */
2063         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &recmaster);
2064         if (ret != 0) {
2065                 DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", pnn));
2066                 goto again;
2067         }
2068
2069         if (recmaster == (uint32_t)-1) {
2070                 DEBUG(0,(__location__ " Initial recovery master set - forcing election\n"));
2071                 force_election(rec, mem_ctx, pnn, nodemap);
2072                 goto again;
2073         }
2074         
2075         /* check that we (recovery daemon) and the local ctdb daemon
2076            agrees on whether we are banned or not
2077         */
2078         if (nodemap->nodes[pnn].flags & NODE_FLAGS_BANNED) {
2079                 if (rec->banned_nodes[pnn] == NULL) {
2080                         if (recmaster == pnn) {
2081                                 DEBUG(0,("Local ctdb daemon on recmaster thinks this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2082
2083                                 ctdb_unban_node(rec, pnn);
2084                         } else {
2085                                 DEBUG(0,("Local ctdb daemon on non-recmaster thinks this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2086                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2087                                 ctdb_set_culprit(rec, pnn);
2088                         }
2089                         goto again;
2090                 }
2091         } else {
2092                 if (rec->banned_nodes[pnn] != NULL) {
2093                         if (recmaster == pnn) {
2094                                 DEBUG(0,("Local ctdb daemon on recmaster does not think this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2095
2096                                 ctdb_unban_node(rec, pnn);
2097                         } else {
2098                                 DEBUG(0,("Local ctdb daemon on non-recmaster does not think this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2099
2100                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2101                                 ctdb_set_culprit(rec, pnn);
2102                         }
2103                         goto again;
2104                 }
2105         }
2106
2107         /* remember our own node flags */
2108         rec->node_flags = nodemap->nodes[pnn].flags;
2109
2110         /* count how many active nodes there are */
2111         num_active = 0;
2112         for (i=0; i<nodemap->num; i++) {
2113                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2114                         num_active++;
2115                 }
2116         }
2117
2118
2119         /* verify that the recmaster node is still active */
2120         for (j=0; j<nodemap->num; j++) {
2121                 if (nodemap->nodes[j].pnn==recmaster) {
2122                         break;
2123                 }
2124         }
2125
2126         if (j == nodemap->num) {
2127                 DEBUG(0, ("Recmaster node %u not in list. Force reelection\n", recmaster));
2128                 force_election(rec, mem_ctx, pnn, nodemap);
2129                 goto again;
2130         }
2131
2132         /* if recovery master is disconnected we must elect a new recmaster */
2133         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2134                 DEBUG(0, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2135                 force_election(rec, mem_ctx, pnn, nodemap);
2136                 goto again;
2137         }
2138
2139         /* grap the nodemap from the recovery master to check if it is banned */
2140         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2141                                    mem_ctx, &remote_nodemap);
2142         if (ret != 0) {
2143                 DEBUG(0, (__location__ " Unable to get nodemap from recovery master %u\n", 
2144                           nodemap->nodes[j].pnn));
2145                 goto again;
2146         }
2147
2148
2149         if (remote_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2150                 DEBUG(0, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2151                 force_election(rec, mem_ctx, pnn, nodemap);
2152                 goto again;
2153         }
2154
2155         /* verify that the public ip address allocation is consistent */
2156         if (ctdb->vnn != NULL) {
2157                 ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2158                 if (ret != 0) {
2159                         DEBUG(0, ("Unable to get public ips from node %u\n", i));
2160                         goto again;
2161                 }
2162                 for (j=0; j<ips->num; j++) {
2163                         /* verify that we have the ip addresses we should have
2164                            and we dont have ones we shouldnt have.
2165                            if we find an inconsistency we set recmode to
2166                            active on the local node and wait for the recmaster
2167                            to do a full blown recovery
2168                         */
2169                         if (ips->ips[j].pnn == pnn) {
2170                                 if (!ctdb_sys_have_ip(ips->ips[j].sin)) {
2171                                         DEBUG(0,("Public address '%s' is missing and we should serve this ip\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
2172                                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2173                                         if (ret != 0) {
2174                                                 DEBUG(0,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2175                                                 goto again;
2176                                         }
2177                                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2178                                         if (ret != 0) {
2179                                                 DEBUG(0,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2180                                                 goto again;
2181                                         }
2182                                 }
2183                         } else {
2184                                 if (ctdb_sys_have_ip(ips->ips[j].sin)) {
2185                                         DEBUG(0,("We are still serving a public address '%s' that we should not be serving.\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
2186                                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2187                                         if (ret != 0) {
2188                                                 DEBUG(0,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2189                                                 goto again;
2190                                         }
2191                                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2192                                         if (ret != 0) {
2193                                                 DEBUG(0,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2194                                                 goto again;
2195                                         }
2196                                 }
2197                         }
2198                 }
2199         }
2200
2201         /* if we are not the recmaster then we do not need to check
2202            if recovery is needed
2203          */
2204         if (pnn != recmaster) {
2205                 goto again;
2206         }
2207
2208
2209         /* ensure our local copies of flags are right */
2210         ret = update_local_flags(rec, nodemap);
2211         if (ret == MONITOR_ELECTION_NEEDED) {
2212                 DEBUG(0,("update_local_flags() called for a re-election.\n"));
2213                 force_election(rec, mem_ctx, pnn, nodemap);
2214                 goto again;
2215         }
2216         if (ret != MONITOR_OK) {
2217                 DEBUG(0,("Unable to update local flags\n"));
2218                 goto again;
2219         }
2220
2221         /* update the list of public ips that a node can handle for
2222            all connected nodes
2223         */
2224         for (j=0; j<nodemap->num; j++) {
2225                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2226                         continue;
2227                 }
2228                 /* release any existing data */
2229                 if (ctdb->nodes[j]->public_ips) {
2230                         talloc_free(ctdb->nodes[j]->public_ips);
2231                         ctdb->nodes[j]->public_ips = NULL;
2232                 }
2233                 /* grab a new shiny list of public ips from the node */
2234                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
2235                         ctdb->nodes[j]->pnn, 
2236                         ctdb->nodes,
2237                         &ctdb->nodes[j]->public_ips)) {
2238                         DEBUG(0,("Failed to read public ips from node : %u\n", 
2239                                 ctdb->nodes[j]->pnn));
2240                         goto again;
2241                 }
2242         }
2243
2244
2245         /* verify that all active nodes agree that we are the recmaster */
2246         switch (verify_recmaster(ctdb, nodemap, pnn)) {
2247         case MONITOR_RECOVERY_NEEDED:
2248                 /* can not happen */
2249                 goto again;
2250         case MONITOR_ELECTION_NEEDED:
2251                 force_election(rec, mem_ctx, pnn, nodemap);
2252                 goto again;
2253         case MONITOR_OK:
2254                 break;
2255         case MONITOR_FAILED:
2256                 goto again;
2257         }
2258
2259
2260         if (rec->need_recovery) {
2261                 /* a previous recovery didn't finish */
2262                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
2263                 goto again;             
2264         }
2265
2266         /* verify that all active nodes are in normal mode 
2267            and not in recovery mode 
2268          */
2269         switch (verify_recmode(ctdb, nodemap)) {
2270         case MONITOR_RECOVERY_NEEDED:
2271                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
2272                 goto again;
2273         case MONITOR_FAILED:
2274                 goto again;
2275         case MONITOR_ELECTION_NEEDED:
2276                 /* can not happen */
2277         case MONITOR_OK:
2278                 break;
2279         }
2280
2281
2282         /* we should have the reclock - check its not stale */
2283         if (ctdb->recovery_lock_fd == -1) {
2284                 DEBUG(0,("recovery master doesn't have the recovery lock\n"));
2285                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
2286                 goto again;
2287         }
2288
2289         if (read(ctdb->recovery_lock_fd, &c, 1) == -1) {
2290                 DEBUG(0,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2291                 close(ctdb->recovery_lock_fd);
2292                 ctdb->recovery_lock_fd = -1;
2293                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
2294                 goto again;
2295         }
2296
2297         /* get the nodemap for all active remote nodes and verify
2298            they are the same as for this node
2299          */
2300         for (j=0; j<nodemap->num; j++) {
2301                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2302                         continue;
2303                 }
2304                 if (nodemap->nodes[j].pnn == pnn) {
2305                         continue;
2306                 }
2307
2308                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2309                                            mem_ctx, &remote_nodemap);
2310                 if (ret != 0) {
2311                         DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
2312                                   nodemap->nodes[j].pnn));
2313                         goto again;
2314                 }
2315
2316                 /* if the nodes disagree on how many nodes there are
2317                    then this is a good reason to try recovery
2318                  */
2319                 if (remote_nodemap->num != nodemap->num) {
2320                         DEBUG(0, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2321                                   nodemap->nodes[j].pnn, remote_nodemap->num, nodemap->num));
2322                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
2323                         goto again;
2324                 }
2325
2326                 /* if the nodes disagree on which nodes exist and are
2327                    active, then that is also a good reason to do recovery
2328                  */
2329                 for (i=0;i<nodemap->num;i++) {
2330                         if (remote_nodemap->nodes[i].pnn != nodemap->nodes[i].pnn) {
2331                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2332                                           nodemap->nodes[j].pnn, i, 
2333                                           remote_nodemap->nodes[i].pnn, nodemap->nodes[i].pnn));
2334                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
2335                                             vnnmap, nodemap->nodes[j].pnn);
2336                                 goto again;
2337                         }
2338                         if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
2339                             (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2340                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
2341                                           nodemap->nodes[j].pnn, i,
2342                                           remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
2343                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
2344                                             vnnmap, nodemap->nodes[j].pnn);
2345                                 goto again;
2346                         }
2347                 }
2348
2349         }
2350
2351
2352         /* there better be the same number of lmasters in the vnn map
2353            as there are active nodes or we will have to do a recovery
2354          */
2355         if (vnnmap->size != num_active) {
2356                 DEBUG(0, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
2357                           vnnmap->size, num_active));
2358                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
2359                 goto again;
2360         }
2361
2362         /* verify that all active nodes in the nodemap also exist in 
2363            the vnnmap.
2364          */
2365         for (j=0; j<nodemap->num; j++) {
2366                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2367                         continue;
2368                 }
2369                 if (nodemap->nodes[j].pnn == pnn) {
2370                         continue;
2371                 }
2372
2373                 for (i=0; i<vnnmap->size; i++) {
2374                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2375                                 break;
2376                         }
2377                 }
2378                 if (i == vnnmap->size) {
2379                         DEBUG(0, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
2380                                   nodemap->nodes[j].pnn));
2381                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
2382                         goto again;
2383                 }
2384         }
2385
2386         
2387         /* verify that all other nodes have the same vnnmap
2388            and are from the same generation
2389          */
2390         for (j=0; j<nodemap->num; j++) {
2391                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2392                         continue;
2393                 }
2394                 if (nodemap->nodes[j].pnn == pnn) {
2395                         continue;
2396                 }
2397
2398                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2399                                           mem_ctx, &remote_vnnmap);
2400                 if (ret != 0) {
2401                         DEBUG(0, (__location__ " Unable to get vnnmap from remote node %u\n", 
2402                                   nodemap->nodes[j].pnn));
2403                         goto again;
2404                 }
2405
2406                 /* verify the vnnmap generation is the same */
2407                 if (vnnmap->generation != remote_vnnmap->generation) {
2408                         DEBUG(0, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
2409                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
2410                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
2411                         goto again;
2412                 }
2413
2414                 /* verify the vnnmap size is the same */
2415                 if (vnnmap->size != remote_vnnmap->size) {
2416                         DEBUG(0, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
2417                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
2418                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
2419                         goto again;
2420                 }
2421
2422                 /* verify the vnnmap is the same */
2423                 for (i=0;i<vnnmap->size;i++) {
2424                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
2425                                 DEBUG(0, (__location__ " Remote node %u has different vnnmap.\n", 
2426                                           nodemap->nodes[j].pnn));
2427                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
2428                                             vnnmap, nodemap->nodes[j].pnn);
2429                                 goto again;
2430                         }
2431                 }
2432         }
2433
2434         /* we might need to change who has what IP assigned */
2435         if (rec->need_takeover_run) {
2436                 rec->need_takeover_run = false;
2437
2438                 /* execute the "startrecovery" event script on all nodes */
2439                 ret = run_startrecovery_eventscript(ctdb, nodemap);
2440                 if (ret!=0) {
2441                         DEBUG(0, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
2442                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
2443                                     vnnmap, ctdb->pnn);
2444                 }
2445
2446                 ret = ctdb_takeover_run(ctdb, nodemap);
2447                 if (ret != 0) {
2448                         DEBUG(0, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
2449                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
2450                                     vnnmap, ctdb->pnn);
2451                 }
2452
2453                 /* execute the "recovered" event script on all nodes */
2454                 ret = run_recovered_eventscript(ctdb, nodemap);
2455                 if (ret!=0) {
2456                         DEBUG(0, (__location__ " Unable to run the 'recovered' event on cluster\n"));
2457                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
2458                                     vnnmap, ctdb->pnn);
2459                 }
2460         }
2461
2462         goto again;
2463
2464 }
2465
2466 /*
2467   event handler for when the main ctdbd dies
2468  */
2469 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
2470                                  uint16_t flags, void *private_data)
2471 {
2472         DEBUG(0,("recovery daemon parent died - exiting\n"));
2473         _exit(1);
2474 }
2475
2476 /*
2477   startup the recovery daemon as a child of the main ctdb daemon
2478  */
2479 int ctdb_start_recoverd(struct ctdb_context *ctdb)
2480 {
2481         int ret;
2482         int fd[2];
2483
2484         if (pipe(fd) != 0) {
2485                 return -1;
2486         }
2487
2488         ctdb->ctdbd_pid = getpid();
2489
2490         ctdb->recoverd_pid = fork();
2491         if (ctdb->recoverd_pid == -1) {
2492                 return -1;
2493         }
2494         
2495         if (ctdb->recoverd_pid != 0) {
2496                 close(fd[0]);
2497                 return 0;
2498         }
2499
2500         close(fd[1]);
2501
2502         /* shutdown the transport */
2503         ctdb->methods->shutdown(ctdb);
2504
2505         /* get a new event context */
2506         talloc_free(ctdb->ev);
2507         ctdb->ev = event_context_init(ctdb);
2508
2509         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
2510                      ctdb_recoverd_parent, &fd[0]);     
2511
2512         close(ctdb->daemon.sd);
2513         ctdb->daemon.sd = -1;
2514
2515         srandom(getpid() ^ time(NULL));
2516
2517         /* the recovery daemon does not need to be realtime */
2518         if (ctdb->do_setsched) {
2519                 ctdb_restore_scheduler(ctdb);
2520         }
2521
2522         /* initialise ctdb */
2523         ret = ctdb_socket_connect(ctdb);
2524         if (ret != 0) {
2525                 DEBUG(0, (__location__ " Failed to init ctdb\n"));
2526                 exit(1);
2527         }
2528
2529         monitor_cluster(ctdb);
2530
2531         DEBUG(0,("ERROR: ctdb_recoverd finished!?\n"));
2532         return -1;
2533 }
2534
2535 /*
2536   shutdown the recovery daemon
2537  */
2538 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
2539 {
2540         if (ctdb->recoverd_pid == 0) {
2541                 return;
2542         }
2543
2544         DEBUG(0,("Shutting down recovery daemon\n"));
2545         kill(ctdb->recoverd_pid, SIGTERM);
2546 }