merge from tridge
[sahlberg/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 struct ban_state {
35         struct ctdb_recoverd *rec;
36         uint32_t banned_node;
37 };
38
39 /*
40   private state of recovery daemon
41  */
42 struct ctdb_recoverd {
43         struct ctdb_context *ctdb;
44         uint32_t last_culprit;
45         uint32_t culprit_counter;
46         struct timeval first_recover_time;
47         struct ban_state **banned_nodes;
48         struct timeval priority_time;
49         bool need_takeover_run;
50         bool need_recovery;
51         uint32_t node_flags;
52         struct timed_event *send_election_te;
53         struct timed_event *election_timeout;
54         struct vacuum_info *vacuum_info;
55 };
56
57 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
58 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
59
60
61 struct async_data {
62         uint32_t count;
63         uint32_t fail_count;
64 };
65
66 static void async_callback(struct ctdb_client_control_state *state)
67 {
68         struct async_data *data = talloc_get_type(state->async.private_data, struct async_data);
69         int ret;
70         int32_t res;
71
72         /* one more node has responded with recmode data */
73         data->count--;
74
75         /* if we failed to push the db, then return an error and let
76            the main loop try again.
77         */
78         if (state->state != CTDB_CONTROL_DONE) {
79                 DEBUG(0,("Async operation failed with state %d\n", state->state));
80                 data->fail_count++;
81                 return;
82         }
83         
84         state->async.fn = NULL;
85
86         ret = ctdb_control_recv(state->ctdb, state, data, NULL, &res, NULL);
87         if ((ret != 0) || (res != 0)) {
88                 DEBUG(0,("Async operation failed with ret=%d res=%d\n", ret, (int)res));
89                 data->fail_count++;
90         }
91 }
92
93
94 static void async_add(struct async_data *data, struct ctdb_client_control_state *state)
95 {
96         /* set up the callback functions */
97         state->async.fn = async_callback;
98         state->async.private_data = data;
99         
100         /* one more control to wait for to complete */
101         data->count++;
102 }
103
104
105 /* wait for up to the maximum number of seconds allowed
106    or until all nodes we expect a response from has replied
107 */
108 static int async_wait(struct ctdb_context *ctdb, struct async_data *data)
109 {
110         while (data->count > 0) {
111                 event_loop_once(ctdb->ev);
112         }
113         if (data->fail_count != 0) {
114                 DEBUG(0,("Async wait failed - fail_count=%u\n", data->fail_count));
115                 return -1;
116         }
117         return 0;
118 }
119
120
121 /*
122   unban a node
123  */
124 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
125 {
126         struct ctdb_context *ctdb = rec->ctdb;
127
128         DEBUG(0,("Unbanning node %u\n", pnn));
129
130         if (!ctdb_validate_pnn(ctdb, pnn)) {
131                 DEBUG(0,("Bad pnn %u in ctdb_unban_node\n", pnn));
132                 return;
133         }
134
135         /* If we are unbanning a different node then just pass the ban info on */
136         if (pnn != ctdb->pnn) {
137                 TDB_DATA data;
138                 int ret;
139                 
140                 DEBUG(0,("Unanning remote node %u. Passing the ban request on to the remote node.\n", pnn));
141
142                 data.dptr = (uint8_t *)&pnn;
143                 data.dsize = sizeof(uint32_t);
144
145                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_UNBAN_NODE, data);
146                 if (ret != 0) {
147                         DEBUG(0,("Failed to unban node %u\n", pnn));
148                         return;
149                 }
150
151                 return;
152         }
153
154         /* make sure we remember we are no longer banned in case 
155            there is an election */
156         rec->node_flags &= ~NODE_FLAGS_BANNED;
157
158         DEBUG(0,("Clearing ban flag on node %u\n", pnn));
159         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
160
161         if (rec->banned_nodes[pnn] == NULL) {
162                 DEBUG(0,("No ban recorded for this node. ctdb_unban_node() request ignored\n"));
163                 return;
164         }
165
166         talloc_free(rec->banned_nodes[pnn]);
167         rec->banned_nodes[pnn] = NULL;
168 }
169
170
171 /*
172   called when a ban has timed out
173  */
174 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
175 {
176         struct ban_state *state = talloc_get_type(p, struct ban_state);
177         struct ctdb_recoverd *rec = state->rec;
178         uint32_t pnn = state->banned_node;
179
180         DEBUG(0,("Ban timeout. Node %u is now unbanned\n", pnn));
181         ctdb_unban_node(rec, pnn);
182 }
183
184 /*
185   ban a node for a period of time
186  */
187 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
188 {
189         struct ctdb_context *ctdb = rec->ctdb;
190
191         DEBUG(0,("Banning node %u for %u seconds\n", pnn, ban_time));
192
193         if (!ctdb_validate_pnn(ctdb, pnn)) {
194                 DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
195                 return;
196         }
197
198         if (0 == ctdb->tunable.enable_bans) {
199                 DEBUG(0,("Bans are disabled - ignoring ban of node %u\n", pnn));
200                 return;
201         }
202
203         /* If we are banning a different node then just pass the ban info on */
204         if (pnn != ctdb->pnn) {
205                 struct ctdb_ban_info b;
206                 TDB_DATA data;
207                 int ret;
208                 
209                 DEBUG(0,("Banning remote node %u for %u seconds. Passing the ban request on to the remote node.\n", pnn, ban_time));
210
211                 b.pnn = pnn;
212                 b.ban_time = ban_time;
213
214                 data.dptr = (uint8_t *)&b;
215                 data.dsize = sizeof(b);
216
217                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_BAN_NODE, data);
218                 if (ret != 0) {
219                         DEBUG(0,("Failed to ban node %u\n", pnn));
220                         return;
221                 }
222
223                 return;
224         }
225
226         DEBUG(0,("self ban - lowering our election priority\n"));
227         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
228
229         /* banning ourselves - lower our election priority */
230         rec->priority_time = timeval_current();
231
232         /* make sure we remember we are banned in case there is an 
233            election */
234         rec->node_flags |= NODE_FLAGS_BANNED;
235
236         if (rec->banned_nodes[pnn] != NULL) {
237                 DEBUG(0,("Re-banning an already banned node. Remove previous ban and set a new ban.\n"));               
238                 talloc_free(rec->banned_nodes[pnn]);
239                 rec->banned_nodes[pnn] = NULL;
240         }
241
242         rec->banned_nodes[pnn] = talloc(rec->banned_nodes, struct ban_state);
243         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
244
245         rec->banned_nodes[pnn]->rec = rec;
246         rec->banned_nodes[pnn]->banned_node = pnn;
247
248         if (ban_time != 0) {
249                 event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
250                                 timeval_current_ofs(ban_time, 0),
251                                 ctdb_ban_timeout, rec->banned_nodes[pnn]);
252         }
253 }
254
255 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
256
257
258 /* 
259    perform a simple control on all active nodes. The control cannot return data
260  */
261 static int async_control_on_active_nodes(struct ctdb_context *ctdb, enum ctdb_controls opcode,
262                                          struct ctdb_node_map *nodemap, TDB_DATA data, bool include_self)
263 {
264         struct async_data *async_data;
265         struct ctdb_client_control_state *state;
266         int j;
267         struct timeval timeout = CONTROL_TIMEOUT();
268         
269         async_data = talloc_zero(ctdb, struct async_data);
270         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
271
272         /* loop over all active nodes and send an async control to each of them */
273         for (j=0; j<nodemap->num; j++) {
274                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
275                         continue;
276                 }
277                 if (nodemap->nodes[j].pnn == ctdb->pnn && !include_self) {
278                         continue;
279                 }
280                 state = ctdb_control_send(ctdb, nodemap->nodes[j].pnn, 0, opcode, 
281                                           0, data, async_data, NULL, &timeout, NULL);
282                 if (state == NULL) {
283                         DEBUG(0,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
284                         talloc_free(async_data);
285                         return -1;
286                 }
287                 
288                 async_add(async_data, state);
289         }
290
291         if (async_wait(ctdb, async_data) != 0) {
292                 DEBUG(0,(__location__ " Failed async control %u\n", (unsigned)opcode));
293                 talloc_free(async_data);
294                 return -1;
295         }
296
297         talloc_free(async_data);
298         return 0;
299 }
300
301
302
303 /*
304   change recovery mode on all nodes
305  */
306 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
307 {
308         TDB_DATA data;
309
310         /* freeze all nodes */
311         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
312                 if (async_control_on_active_nodes(ctdb, CTDB_CONTROL_FREEZE, 
313                                                   nodemap, tdb_null, true) != 0) {
314                         DEBUG(0, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
315                         return -1;
316                 }
317         }
318
319
320         data.dsize = sizeof(uint32_t);
321         data.dptr = (unsigned char *)&rec_mode;
322
323         if (async_control_on_active_nodes(ctdb, CTDB_CONTROL_SET_RECMODE, 
324                                           nodemap, data, true) != 0) {
325                 DEBUG(0, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
326                 return -1;
327         }
328
329         if (rec_mode == CTDB_RECOVERY_NORMAL) {
330                 if (async_control_on_active_nodes(ctdb, CTDB_CONTROL_THAW, 
331                                                   nodemap, tdb_null, true) != 0) {
332                         DEBUG(0, (__location__ " Unable to thaw nodes. Recovery failed.\n"));
333                         return -1;
334                 }
335         }
336
337         return 0;
338 }
339
340 /*
341   change recovery master on all node
342  */
343 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
344 {
345         TDB_DATA data;
346
347         data.dsize = sizeof(uint32_t);
348         data.dptr = (unsigned char *)&pnn;
349
350         if (async_control_on_active_nodes(ctdb, CTDB_CONTROL_SET_RECMASTER, 
351                                           nodemap, data, true) != 0) {
352                 DEBUG(0, (__location__ " Unable to set recmaster. Recovery failed.\n"));
353                 return -1;
354         }
355
356         return 0;
357 }
358
359
360 /*
361   ensure all other nodes have attached to any databases that we have
362  */
363 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
364                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
365 {
366         int i, j, db, ret;
367         struct ctdb_dbid_map *remote_dbmap;
368
369         /* verify that all other nodes have all our databases */
370         for (j=0; j<nodemap->num; j++) {
371                 /* we dont need to ourself ourselves */
372                 if (nodemap->nodes[j].pnn == pnn) {
373                         continue;
374                 }
375                 /* dont check nodes that are unavailable */
376                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
377                         continue;
378                 }
379
380                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
381                                          mem_ctx, &remote_dbmap);
382                 if (ret != 0) {
383                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
384                         return -1;
385                 }
386
387                 /* step through all local databases */
388                 for (db=0; db<dbmap->num;db++) {
389                         const char *name;
390
391
392                         for (i=0;i<remote_dbmap->num;i++) {
393                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
394                                         break;
395                                 }
396                         }
397                         /* the remote node already have this database */
398                         if (i!=remote_dbmap->num) {
399                                 continue;
400                         }
401                         /* ok so we need to create this database */
402                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
403                                             mem_ctx, &name);
404                         if (ret != 0) {
405                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", pnn));
406                                 return -1;
407                         }
408                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
409                                            mem_ctx, name, dbmap->dbs[db].persistent);
410                         if (ret != 0) {
411                                 DEBUG(0, (__location__ " Unable to create remote db:%s\n", name));
412                                 return -1;
413                         }
414                 }
415         }
416
417         return 0;
418 }
419
420
421 /*
422   ensure we are attached to any databases that anyone else is attached to
423  */
424 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
425                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
426 {
427         int i, j, db, ret;
428         struct ctdb_dbid_map *remote_dbmap;
429
430         /* verify that we have all database any other node has */
431         for (j=0; j<nodemap->num; j++) {
432                 /* we dont need to ourself ourselves */
433                 if (nodemap->nodes[j].pnn == pnn) {
434                         continue;
435                 }
436                 /* dont check nodes that are unavailable */
437                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
438                         continue;
439                 }
440
441                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
442                                          mem_ctx, &remote_dbmap);
443                 if (ret != 0) {
444                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
445                         return -1;
446                 }
447
448                 /* step through all databases on the remote node */
449                 for (db=0; db<remote_dbmap->num;db++) {
450                         const char *name;
451
452                         for (i=0;i<(*dbmap)->num;i++) {
453                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
454                                         break;
455                                 }
456                         }
457                         /* we already have this db locally */
458                         if (i!=(*dbmap)->num) {
459                                 continue;
460                         }
461                         /* ok so we need to create this database and
462                            rebuild dbmap
463                          */
464                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
465                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
466                         if (ret != 0) {
467                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", 
468                                           nodemap->nodes[j].pnn));
469                                 return -1;
470                         }
471                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
472                                            remote_dbmap->dbs[db].persistent);
473                         if (ret != 0) {
474                                 DEBUG(0, (__location__ " Unable to create local db:%s\n", name));
475                                 return -1;
476                         }
477                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
478                         if (ret != 0) {
479                                 DEBUG(0, (__location__ " Unable to reread dbmap on node %u\n", pnn));
480                                 return -1;
481                         }
482                 }
483         }
484
485         return 0;
486 }
487
488
489 /*
490   pull the remote database contents from one node into the recdb
491  */
492 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
493                                     struct tdb_wrap *recdb, uint32_t dbid)
494 {
495         int ret;
496         TDB_DATA outdata;
497         struct ctdb_control_pulldb_reply *reply;
498         struct ctdb_rec_data *rec;
499         int i;
500         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
501
502         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
503                                CONTROL_TIMEOUT(), &outdata);
504         if (ret != 0) {
505                 DEBUG(0,(__location__ " Unable to copy db from node %u\n", srcnode));
506                 talloc_free(tmp_ctx);
507                 return -1;
508         }
509
510         reply = (struct ctdb_control_pulldb_reply *)outdata.dptr;
511
512         if (outdata.dsize < offsetof(struct ctdb_control_pulldb_reply, data)) {
513                 DEBUG(0,(__location__ " invalid data in pulldb reply\n"));
514                 talloc_free(tmp_ctx);
515                 return -1;
516         }
517         
518         rec = (struct ctdb_rec_data *)&reply->data[0];
519         
520         for (i=0;
521              i<reply->count;
522              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
523                 TDB_DATA key, data;
524                 struct ctdb_ltdb_header *hdr;
525                 TDB_DATA existing;
526                 
527                 key.dptr = &rec->data[0];
528                 key.dsize = rec->keylen;
529                 data.dptr = &rec->data[key.dsize];
530                 data.dsize = rec->datalen;
531                 
532                 hdr = (struct ctdb_ltdb_header *)data.dptr;
533
534                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
535                         DEBUG(0,(__location__ " bad ltdb record\n"));
536                         talloc_free(tmp_ctx);
537                         return -1;
538                 }
539
540                 /* fetch the existing record, if any */
541                 existing = tdb_fetch(recdb->tdb, key);
542                 
543                 if (existing.dptr != NULL) {
544                         struct ctdb_ltdb_header header;
545                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
546                                 DEBUG(0,(__location__ " Bad record size %u from node %u\n", 
547                                          (unsigned)existing.dsize, srcnode));
548                                 free(existing.dptr);
549                                 talloc_free(tmp_ctx);
550                                 return -1;
551                         }
552                         header = *(struct ctdb_ltdb_header *)existing.dptr;
553                         free(existing.dptr);
554                         if (!(header.rsn < hdr->rsn ||
555                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
556                                 continue;
557                         }
558                 }
559                 
560                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
561                         DEBUG(0,(__location__ " Failed to store record\n"));
562                         talloc_free(tmp_ctx);
563                         return -1;                              
564                 }
565         }
566
567         talloc_free(tmp_ctx);
568
569         return 0;
570 }
571
572 /*
573   pull all the remote database contents into the recdb
574  */
575 static int pull_remote_database(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
576                                 struct tdb_wrap *recdb, uint32_t dbid)
577 {
578         int j;
579
580         /* pull all records from all other nodes across onto this node
581            (this merges based on rsn)
582         */
583         for (j=0; j<nodemap->num; j++) {
584                 /* dont merge from nodes that are unavailable */
585                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
586                         continue;
587                 }
588                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
589                         DEBUG(0,(__location__ " Failed to pull remote database from node %u\n", 
590                                  nodemap->nodes[j].pnn));
591                         return -1;
592                 }
593         }
594         
595         return 0;
596 }
597
598
599 /*
600   update flags on all active nodes
601  */
602 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
603 {
604         int i;
605         for (i=0;i<nodemap->num;i++) {
606                 struct ctdb_node_flag_change c;
607                 TDB_DATA data;
608
609                 c.pnn = nodemap->nodes[i].pnn;
610                 c.old_flags = nodemap->nodes[i].flags;
611                 c.new_flags = nodemap->nodes[i].flags;
612
613                 data.dptr = (uint8_t *)&c;
614                 data.dsize = sizeof(c);
615
616                 ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
617                                   CTDB_SRVID_NODE_FLAGS_CHANGED, data);
618
619         }
620         return 0;
621 }
622
623
624 /*
625   ensure all nodes have the same vnnmap we do
626  */
627 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
628                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
629 {
630         int j, ret;
631
632         /* push the new vnn map out to all the nodes */
633         for (j=0; j<nodemap->num; j++) {
634                 /* dont push to nodes that are unavailable */
635                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
636                         continue;
637                 }
638
639                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
640                 if (ret != 0) {
641                         DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
642                         return -1;
643                 }
644         }
645
646         return 0;
647 }
648
649
650 /*
651   handler for when the admin bans a node
652 */
653 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
654                         TDB_DATA data, void *private_data)
655 {
656         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
657         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
658         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
659
660         if (data.dsize != sizeof(*b)) {
661                 DEBUG(0,("Bad data in ban_handler\n"));
662                 talloc_free(mem_ctx);
663                 return;
664         }
665
666         if (b->pnn != ctdb->pnn) {
667                 DEBUG(0,("Got a ban request for pnn:%u but our pnn is %u. Ignoring ban request\n", b->pnn, ctdb->pnn));
668                 return;
669         }
670
671         DEBUG(0,("Node %u has been banned for %u seconds\n", 
672                  b->pnn, b->ban_time));
673
674         ctdb_ban_node(rec, b->pnn, b->ban_time);
675         talloc_free(mem_ctx);
676 }
677
678 /*
679   handler for when the admin unbans a node
680 */
681 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
682                           TDB_DATA data, void *private_data)
683 {
684         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
685         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
686         uint32_t pnn;
687
688         if (data.dsize != sizeof(uint32_t)) {
689                 DEBUG(0,("Bad data in unban_handler\n"));
690                 talloc_free(mem_ctx);
691                 return;
692         }
693         pnn = *(uint32_t *)data.dptr;
694
695         if (pnn != ctdb->pnn) {
696                 DEBUG(0,("Got an unban request for pnn:%u but our pnn is %u. Ignoring unban request\n", pnn, ctdb->pnn));
697                 return;
698         }
699
700         DEBUG(0,("Node %u has been unbanned.\n", pnn));
701         ctdb_unban_node(rec, pnn);
702         talloc_free(mem_ctx);
703 }
704
705
706 struct vacuum_info {
707         struct vacuum_info *next, *prev;
708         struct ctdb_recoverd *rec;
709         uint32_t srcnode;
710         struct ctdb_db_context *ctdb_db;
711         struct ctdb_control_pulldb_reply *recs;
712         struct ctdb_rec_data *r;
713 };
714
715 static void vacuum_fetch_next(struct vacuum_info *v);
716
717 /*
718   called when a vacuum fetch has completed - just free it and do the next one
719  */
720 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
721 {
722         struct vacuum_info *v = talloc_get_type(state->async.private, struct vacuum_info);
723         talloc_free(state);
724         vacuum_fetch_next(v);
725 }
726
727
728 /*
729   process the next element from the vacuum list
730 */
731 static void vacuum_fetch_next(struct vacuum_info *v)
732 {
733         struct ctdb_call call;
734         struct ctdb_rec_data *r;
735
736         while (v->recs->count) {
737                 struct ctdb_client_call_state *state;
738                 TDB_DATA data;
739                 struct ctdb_ltdb_header *hdr;
740
741                 ZERO_STRUCT(call);
742                 call.call_id = CTDB_NULL_FUNC;
743                 call.flags = CTDB_IMMEDIATE_MIGRATION;
744
745                 r = v->r;
746                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
747                 v->recs->count--;
748
749                 call.key.dptr = &r->data[0];
750                 call.key.dsize = r->keylen;
751
752                 /* ensure we don't block this daemon - just skip a record if we can't get
753                    the chainlock */
754                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
755                         continue;
756                 }
757
758                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
759                 if (data.dptr == NULL || data.dsize < sizeof(struct ctdb_ltdb_header)) {
760                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
761                         continue;
762                 }
763                 
764                 hdr = (struct ctdb_ltdb_header *)data.dptr;
765                 if (hdr->dmaster == v->rec->ctdb->pnn) {
766                         /* its already local */
767                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
768                         continue;
769                 }
770
771                 state = ctdb_call_send(v->ctdb_db, &call);
772                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
773                 if (state == NULL) {
774                         DEBUG(0,(__location__ " Failed to setup vacuum fetch call\n"));
775                         talloc_free(v);
776                         return;
777                 }
778                 state->async.fn = vacuum_fetch_callback;
779                 state->async.private = v;
780                 return;
781         }
782
783         talloc_free(v);
784 }
785
786
787 /*
788   destroy a vacuum info structure
789  */
790 static int vacuum_info_destructor(struct vacuum_info *v)
791 {
792         DLIST_REMOVE(v->rec->vacuum_info, v);
793         return 0;
794 }
795
796
797 /*
798   handler for vacuum fetch
799 */
800 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
801                                  TDB_DATA data, void *private_data)
802 {
803         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
804         struct ctdb_control_pulldb_reply *recs;
805         int ret, i;
806         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
807         const char *name;
808         struct ctdb_dbid_map *dbmap=NULL;
809         bool persistent = false;
810         struct ctdb_db_context *ctdb_db;
811         struct ctdb_rec_data *r;
812         uint32_t srcnode;
813         struct vacuum_info *v;
814
815         recs = (struct ctdb_control_pulldb_reply *)data.dptr;
816         r = (struct ctdb_rec_data *)&recs->data[0];
817
818         if (recs->count == 0) {
819                 return;
820         }
821
822         srcnode = r->reqid;
823
824         for (v=rec->vacuum_info;v;v=v->next) {
825                 if (srcnode == v->srcnode) {
826                         /* we're already working on records from this node */
827                         return;
828                 }
829         }
830
831         /* work out if the database is persistent */
832         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
833         if (ret != 0) {
834                 DEBUG(0, (__location__ " Unable to get dbids from local node\n"));
835                 talloc_free(tmp_ctx);
836                 return;
837         }
838
839         for (i=0;i<dbmap->num;i++) {
840                 if (dbmap->dbs[i].dbid == recs->db_id) {
841                         persistent = dbmap->dbs[i].persistent;
842                         break;
843                 }
844         }
845         if (i == dbmap->num) {
846                 DEBUG(0, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
847                 talloc_free(tmp_ctx);
848                 return;         
849         }
850
851         /* find the name of this database */
852         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
853                 DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
854                 talloc_free(tmp_ctx);
855                 return;
856         }
857
858         /* attach to it */
859         ctdb_db = ctdb_attach(ctdb, name, persistent);
860         if (ctdb_db == NULL) {
861                 DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
862                 talloc_free(tmp_ctx);
863                 return;
864         }
865
866         v = talloc_zero(rec, struct vacuum_info);
867         if (v == NULL) {
868                 DEBUG(0,(__location__ " Out of memory\n"));
869                 return;
870         }
871
872         v->rec = rec;
873         v->srcnode = srcnode;
874         v->ctdb_db = ctdb_db;
875         v->recs = talloc_memdup(v, recs, data.dsize);
876         if (v->recs == NULL) {
877                 DEBUG(0,(__location__ " Out of memory\n"));
878                 talloc_free(v);
879                 return;         
880         }
881         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
882
883         DLIST_ADD(rec->vacuum_info, v);
884
885         talloc_set_destructor(v, vacuum_info_destructor);
886
887         vacuum_fetch_next(v);
888 }
889
890
891 /*
892   called when ctdb_wait_timeout should finish
893  */
894 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
895                               struct timeval yt, void *p)
896 {
897         uint32_t *timed_out = (uint32_t *)p;
898         (*timed_out) = 1;
899 }
900
901 /*
902   wait for a given number of seconds
903  */
904 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
905 {
906         uint32_t timed_out = 0;
907         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
908         while (!timed_out) {
909                 event_loop_once(ctdb->ev);
910         }
911 }
912
913 /*
914   called when an election times out (ends)
915  */
916 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
917                                   struct timeval t, void *p)
918 {
919         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
920         rec->election_timeout = NULL;
921 }
922
923
924 /*
925   wait for an election to finish. It finished election_timeout seconds after
926   the last election packet is received
927  */
928 static void ctdb_wait_election(struct ctdb_recoverd *rec)
929 {
930         struct ctdb_context *ctdb = rec->ctdb;
931         while (rec->election_timeout) {
932                 event_loop_once(ctdb->ev);
933         }
934 }
935
936 /*
937   remember the trouble maker
938  */
939 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
940 {
941         struct ctdb_context *ctdb = rec->ctdb;
942
943         if (rec->last_culprit != culprit ||
944             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
945                 DEBUG(0,("New recovery culprit %u\n", culprit));
946                 /* either a new node is the culprit, or we've decided to forgive them */
947                 rec->last_culprit = culprit;
948                 rec->first_recover_time = timeval_current();
949                 rec->culprit_counter = 0;
950         }
951         rec->culprit_counter++;
952 }
953
954 /*
955   Update our local flags from all remote connected nodes. 
956   This is only run when we are or we belive we are the recovery master
957  */
958 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
959 {
960         int j;
961         struct ctdb_context *ctdb = rec->ctdb;
962         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
963
964         /* get the nodemap for all active remote nodes and verify
965            they are the same as for this node
966          */
967         for (j=0; j<nodemap->num; j++) {
968                 struct ctdb_node_map *remote_nodemap=NULL;
969                 int ret;
970
971                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
972                         continue;
973                 }
974                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
975                         continue;
976                 }
977
978                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
979                                            mem_ctx, &remote_nodemap);
980                 if (ret != 0) {
981                         DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
982                                   nodemap->nodes[j].pnn));
983                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
984                         talloc_free(mem_ctx);
985                         return MONITOR_FAILED;
986                 }
987                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
988                         struct ctdb_node_flag_change c;
989                         TDB_DATA data;
990
991                         /* We should tell our daemon about this so it
992                            updates its flags or else we will log the same 
993                            message again in the next iteration of recovery.
994                            Since we are the recovery master we can just as
995                            well update the flags on all nodes.
996                         */
997                         c.pnn = nodemap->nodes[j].pnn;
998                         c.old_flags = nodemap->nodes[j].flags;
999                         c.new_flags = remote_nodemap->nodes[j].flags;
1000
1001                         data.dptr = (uint8_t *)&c;
1002                         data.dsize = sizeof(c);
1003
1004                         ctdb_send_message(ctdb, ctdb->pnn,
1005                                         CTDB_SRVID_NODE_FLAGS_CHANGED, 
1006                                         data);
1007
1008                         /* Update our local copy of the flags in the recovery
1009                            daemon.
1010                         */
1011                         DEBUG(0,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1012                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1013                                  nodemap->nodes[j].flags));
1014                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1015
1016                         /* If the BANNED flag has changed for the node
1017                            this is a good reason to do a new election.
1018                          */
1019                         if ((c.old_flags ^ c.new_flags) & NODE_FLAGS_BANNED) {
1020                                 DEBUG(0,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
1021                                  nodemap->nodes[j].pnn, c.new_flags,
1022                                  c.old_flags));
1023                                 talloc_free(mem_ctx);
1024                                 return MONITOR_ELECTION_NEEDED;
1025                         }
1026
1027                 }
1028                 talloc_free(remote_nodemap);
1029         }
1030         talloc_free(mem_ctx);
1031         return MONITOR_OK;
1032 }
1033
1034
1035 /* Create a new random generation ip. 
1036    The generation id can not be the INVALID_GENERATION id
1037 */
1038 static uint32_t new_generation(void)
1039 {
1040         uint32_t generation;
1041
1042         while (1) {
1043                 generation = random();
1044
1045                 if (generation != INVALID_GENERATION) {
1046                         break;
1047                 }
1048         }
1049
1050         return generation;
1051 }
1052
1053
1054 /*
1055   create a temporary working database
1056  */
1057 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1058 {
1059         char *name;
1060         struct tdb_wrap *recdb;
1061
1062         /* open up the temporary recovery database */
1063         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
1064         if (name == NULL) {
1065                 return NULL;
1066         }
1067         unlink(name);
1068         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1069                               TDB_NOLOCK, O_RDWR|O_CREAT|O_EXCL, 0600);
1070         if (recdb == NULL) {
1071                 DEBUG(0,(__location__ " Failed to create temp recovery database '%s'\n", name));
1072         }
1073
1074         talloc_free(name);
1075
1076         return recdb;
1077 }
1078
1079
1080 /* 
1081    a traverse function for pulling all relevent records from recdb
1082  */
1083 struct recdb_data {
1084         struct ctdb_context *ctdb;
1085         struct ctdb_control_pulldb_reply *recdata;
1086         uint32_t len;
1087         bool failed;
1088 };
1089
1090 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1091 {
1092         struct recdb_data *params = (struct recdb_data *)p;
1093         struct ctdb_rec_data *rec;
1094         struct ctdb_ltdb_header *hdr;
1095
1096         /* skip empty records */
1097         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1098                 return 0;
1099         }
1100
1101         /* update the dmaster field to point to us */
1102         hdr = (struct ctdb_ltdb_header *)data.dptr;
1103         hdr->dmaster = params->ctdb->pnn;
1104
1105         /* add the record to the blob ready to send to the nodes */
1106         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1107         if (rec == NULL) {
1108                 params->failed = true;
1109                 return -1;
1110         }
1111         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1112         if (params->recdata == NULL) {
1113                 DEBUG(0,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1114                          rec->length + params->len, params->recdata->count));
1115                 params->failed = true;
1116                 return -1;
1117         }
1118         params->recdata->count++;
1119         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1120         params->len += rec->length;
1121         talloc_free(rec);
1122
1123         return 0;
1124 }
1125
1126 /*
1127   push the recdb database out to all nodes
1128  */
1129 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1130                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1131 {
1132         struct recdb_data params;
1133         struct ctdb_control_pulldb_reply *recdata;
1134         TDB_DATA outdata;
1135
1136         recdata = talloc_zero(recdb, struct ctdb_control_pulldb_reply);
1137         CTDB_NO_MEMORY(ctdb, recdata);
1138
1139         recdata->db_id = dbid;
1140
1141         params.ctdb = ctdb;
1142         params.recdata = recdata;
1143         params.len = offsetof(struct ctdb_control_pulldb_reply, data);
1144         params.failed = false;
1145
1146         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1147                 DEBUG(0,(__location__ " Failed to traverse recdb database\n"));
1148                 talloc_free(params.recdata);
1149                 return -1;
1150         }
1151
1152         if (params.failed) {
1153                 DEBUG(0,(__location__ " Failed to traverse recdb database\n"));
1154                 talloc_free(params.recdata);
1155                 return -1;              
1156         }
1157
1158         recdata = params.recdata;
1159
1160         outdata.dptr = (void *)recdata;
1161         outdata.dsize = params.len;
1162
1163         if (async_control_on_active_nodes(ctdb, CTDB_CONTROL_PUSH_DB, nodemap, outdata, true) != 0) {
1164                 DEBUG(0,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1165                 talloc_free(recdata);
1166                 return -1;
1167         }
1168
1169         DEBUG(0, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1170                   dbid, recdata->count));
1171
1172         talloc_free(recdata);
1173
1174         return 0;
1175 }
1176
1177
1178 /*
1179   go through a full recovery on one database 
1180  */
1181 static int recover_database(struct ctdb_recoverd *rec, 
1182                             TALLOC_CTX *mem_ctx,
1183                             uint32_t dbid,
1184                             uint32_t pnn, 
1185                             struct ctdb_node_map *nodemap,
1186                             uint32_t transaction_id)
1187 {
1188         struct tdb_wrap *recdb;
1189         int ret;
1190         struct ctdb_context *ctdb = rec->ctdb;
1191         TDB_DATA data;
1192         struct ctdb_control_wipe_database w;
1193
1194         recdb = create_recdb(ctdb, mem_ctx);
1195         if (recdb == NULL) {
1196                 return -1;
1197         }
1198
1199         /* pull all remote databases onto the recdb */
1200         ret = pull_remote_database(ctdb, nodemap, recdb, dbid);
1201         if (ret != 0) {
1202                 DEBUG(0, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1203                 return -1;
1204         }
1205
1206         DEBUG(0, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1207
1208         /* wipe all the remote databases. This is safe as we are in a transaction */
1209         w.db_id = dbid;
1210         w.transaction_id = transaction_id;
1211
1212         data.dptr = (void *)&w;
1213         data.dsize = sizeof(w);
1214
1215         if (async_control_on_active_nodes(ctdb, CTDB_CONTROL_WIPE_DATABASE, 
1216                                           nodemap, data, true) != 0) {
1217                 DEBUG(0, (__location__ " Unable to wipe database. Recovery failed.\n"));
1218                 return -1;
1219         }
1220         
1221         /* push out the correct database. This sets the dmaster and skips 
1222            the empty records */
1223         ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
1224         if (ret != 0) {
1225                 talloc_free(recdb);
1226                 return -1;
1227         }
1228
1229         /* all done with this database */
1230         talloc_free(recdb);
1231
1232         return 0;
1233 }
1234
1235                 
1236 /*
1237   we are the recmaster, and recovery is needed - start a recovery run
1238  */
1239 static int do_recovery(struct ctdb_recoverd *rec, 
1240                        TALLOC_CTX *mem_ctx, uint32_t pnn, uint32_t num_active,
1241                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
1242                        uint32_t culprit)
1243 {
1244         struct ctdb_context *ctdb = rec->ctdb;
1245         int i, j, ret;
1246         uint32_t generation;
1247         struct ctdb_dbid_map *dbmap;
1248         TDB_DATA data;
1249
1250         DEBUG(0, (__location__ " Starting do_recovery\n"));
1251
1252         /* if recovery fails, force it again */
1253         rec->need_recovery = true;
1254
1255         ctdb_set_culprit(rec, culprit);
1256
1257         if (rec->culprit_counter > 2*nodemap->num) {
1258                 DEBUG(0,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
1259                          culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
1260                          ctdb->tunable.recovery_ban_period));
1261                 ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
1262         }
1263
1264         if (!ctdb_recovery_lock(ctdb, true)) {
1265                 ctdb_set_culprit(rec, pnn);
1266                 DEBUG(0,("Unable to get recovery lock - aborting recovery\n"));
1267                 return -1;
1268         }
1269
1270         DEBUG(0, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
1271
1272         /* get a list of all databases */
1273         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1274         if (ret != 0) {
1275                 DEBUG(0, (__location__ " Unable to get dbids from node :%u\n", pnn));
1276                 return -1;
1277         }
1278
1279         /* we do the db creation before we set the recovery mode, so the freeze happens
1280            on all databases we will be dealing with. */
1281
1282         /* verify that we have all the databases any other node has */
1283         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1284         if (ret != 0) {
1285                 DEBUG(0, (__location__ " Unable to create missing local databases\n"));
1286                 return -1;
1287         }
1288
1289         /* verify that all other nodes have all our databases */
1290         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1291         if (ret != 0) {
1292                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
1293                 return -1;
1294         }
1295
1296         DEBUG(0, (__location__ " Recovery - created remote databases\n"));
1297
1298         /* set recovery mode to active on all nodes */
1299         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1300         if (ret!=0) {
1301                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
1302                 return -1;
1303         }
1304
1305         /* pick a new generation number */
1306         generation = new_generation();
1307
1308         /* change the vnnmap on this node to use the new generation 
1309            number but not on any other nodes.
1310            this guarantees that if we abort the recovery prematurely
1311            for some reason (a node stops responding?)
1312            that we can just return immediately and we will reenter
1313            recovery shortly again.
1314            I.e. we deliberately leave the cluster with an inconsistent
1315            generation id to allow us to abort recovery at any stage and
1316            just restart it from scratch.
1317          */
1318         vnnmap->generation = generation;
1319         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1320         if (ret != 0) {
1321                 DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1322                 return -1;
1323         }
1324
1325         data.dptr = (void *)&generation;
1326         data.dsize = sizeof(uint32_t);
1327
1328         if (async_control_on_active_nodes(ctdb, CTDB_CONTROL_TRANSACTION_START, 
1329                                           nodemap, data, true) != 0) {
1330                 DEBUG(0, (__location__ " Unable to start transactions. Recovery failed.\n"));
1331                 return -1;
1332         }
1333
1334         DEBUG(0,(__location__ " started transactions on all nodes\n"));
1335
1336         for (i=0;i<dbmap->num;i++) {
1337                 if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
1338                         DEBUG(0, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1339                         return -1;
1340                 }
1341         }
1342
1343         DEBUG(0, (__location__ " Recovery - starting database commits\n"));
1344
1345         /* commit all the changes */
1346         if (async_control_on_active_nodes(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT, 
1347                                           nodemap, data, true) != 0) {
1348                 DEBUG(0, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1349                 return -1;
1350         }
1351
1352         DEBUG(0, (__location__ " Recovery - committed databases\n"));
1353         
1354
1355         /* build a new vnn map with all the currently active and
1356            unbanned nodes */
1357         generation = new_generation();
1358         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1359         CTDB_NO_MEMORY(ctdb, vnnmap);
1360         vnnmap->generation = generation;
1361         vnnmap->size = num_active;
1362         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1363         for (i=j=0;i<nodemap->num;i++) {
1364                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1365                         vnnmap->map[j++] = nodemap->nodes[i].pnn;
1366                 }
1367         }
1368
1369         /* update to the new vnnmap on all nodes */
1370         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1371         if (ret != 0) {
1372                 DEBUG(0, (__location__ " Unable to update vnnmap on all nodes\n"));
1373                 return -1;
1374         }
1375
1376         DEBUG(0, (__location__ " Recovery - updated vnnmap\n"));
1377
1378         /* update recmaster to point to us for all nodes */
1379         ret = set_recovery_master(ctdb, nodemap, pnn);
1380         if (ret!=0) {
1381                 DEBUG(0, (__location__ " Unable to set recovery master\n"));
1382                 return -1;
1383         }
1384
1385         DEBUG(0, (__location__ " Recovery - updated recmaster\n"));
1386
1387         /*
1388           update all nodes to have the same flags that we have
1389          */
1390         ret = update_flags_on_all_nodes(ctdb, nodemap);
1391         if (ret != 0) {
1392                 DEBUG(0, (__location__ " Unable to update flags on all nodes\n"));
1393                 return -1;
1394         }
1395         
1396         DEBUG(0, (__location__ " Recovery - updated flags\n"));
1397
1398         /*
1399           if enabled, tell nodes to takeover their public IPs
1400          */
1401         if (ctdb->vnn) {
1402                 rec->need_takeover_run = false;
1403                 ret = ctdb_takeover_run(ctdb, nodemap);
1404                 if (ret != 0) {
1405                         DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
1406                         return -1;
1407                 }
1408                 DEBUG(1, (__location__ " Recovery - done takeover\n"));
1409         }
1410
1411         /* disable recovery mode */
1412         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
1413         if (ret!=0) {
1414                 DEBUG(0, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1415                 return -1;
1416         }
1417
1418         /* send a message to all clients telling them that the cluster 
1419            has been reconfigured */
1420         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1421
1422         DEBUG(0, (__location__ " Recovery complete\n"));
1423
1424         rec->need_recovery = false;
1425
1426         /* We just finished a recovery successfully. 
1427            We now wait for rerecovery_timeout before we allow 
1428            another recovery to take place.
1429         */
1430         DEBUG(0, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1431         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1432         DEBUG(0, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1433
1434         return 0;
1435 }
1436
1437
1438 /*
1439   elections are won by first checking the number of connected nodes, then
1440   the priority time, then the pnn
1441  */
1442 struct election_message {
1443         uint32_t num_connected;
1444         struct timeval priority_time;
1445         uint32_t pnn;
1446         uint32_t node_flags;
1447 };
1448
1449 /*
1450   form this nodes election data
1451  */
1452 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1453 {
1454         int ret, i;
1455         struct ctdb_node_map *nodemap;
1456         struct ctdb_context *ctdb = rec->ctdb;
1457
1458         ZERO_STRUCTP(em);
1459
1460         em->pnn = rec->ctdb->pnn;
1461         em->priority_time = rec->priority_time;
1462         em->node_flags = rec->node_flags;
1463
1464         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1465         if (ret != 0) {
1466                 DEBUG(0,(__location__ " unable to get election data\n"));
1467                 return;
1468         }
1469
1470         for (i=0;i<nodemap->num;i++) {
1471                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1472                         em->num_connected++;
1473                 }
1474         }
1475         talloc_free(nodemap);
1476 }
1477
1478 /*
1479   see if the given election data wins
1480  */
1481 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1482 {
1483         struct election_message myem;
1484         int cmp = 0;
1485
1486         ctdb_election_data(rec, &myem);
1487
1488         /* we cant win if we are banned */
1489         if (rec->node_flags & NODE_FLAGS_BANNED) {
1490                 return false;
1491         }       
1492
1493         /* we will automatically win if the other node is banned */
1494         if (em->node_flags & NODE_FLAGS_BANNED) {
1495                 return true;
1496         }
1497
1498         /* try to use the most connected node */
1499         if (cmp == 0) {
1500                 cmp = (int)myem.num_connected - (int)em->num_connected;
1501         }
1502
1503         /* then the longest running node */
1504         if (cmp == 0) {
1505                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1506         }
1507
1508         if (cmp == 0) {
1509                 cmp = (int)myem.pnn - (int)em->pnn;
1510         }
1511
1512         return cmp > 0;
1513 }
1514
1515 /*
1516   send out an election request
1517  */
1518 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
1519 {
1520         int ret;
1521         TDB_DATA election_data;
1522         struct election_message emsg;
1523         uint64_t srvid;
1524         struct ctdb_context *ctdb = rec->ctdb;
1525
1526         srvid = CTDB_SRVID_RECOVERY;
1527
1528         ctdb_election_data(rec, &emsg);
1529
1530         election_data.dsize = sizeof(struct election_message);
1531         election_data.dptr  = (unsigned char *)&emsg;
1532
1533
1534         /* first we assume we will win the election and set 
1535            recoverymaster to be ourself on the current node
1536          */
1537         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1538         if (ret != 0) {
1539                 DEBUG(0, (__location__ " failed to send recmaster election request\n"));
1540                 return -1;
1541         }
1542
1543
1544         /* send an election message to all active nodes */
1545         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1546
1547         return 0;
1548 }
1549
1550 /*
1551   this function will unban all nodes in the cluster
1552 */
1553 static void unban_all_nodes(struct ctdb_context *ctdb)
1554 {
1555         int ret, i;
1556         struct ctdb_node_map *nodemap;
1557         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1558         
1559         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1560         if (ret != 0) {
1561                 DEBUG(0,(__location__ " failed to get nodemap to unban all nodes\n"));
1562                 return;
1563         }
1564
1565         for (i=0;i<nodemap->num;i++) {
1566                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1567                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1568                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1569                 }
1570         }
1571
1572         talloc_free(tmp_ctx);
1573 }
1574
1575
1576 /*
1577   we think we are winning the election - send a broadcast election request
1578  */
1579 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1580 {
1581         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1582         int ret;
1583
1584         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
1585         if (ret != 0) {
1586                 DEBUG(0,("Failed to send election request!\n"));
1587         }
1588
1589         talloc_free(rec->send_election_te);
1590         rec->send_election_te = NULL;
1591 }
1592
1593 /*
1594   handler for recovery master elections
1595 */
1596 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1597                              TDB_DATA data, void *private_data)
1598 {
1599         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1600         int ret;
1601         struct election_message *em = (struct election_message *)data.dptr;
1602         TALLOC_CTX *mem_ctx;
1603
1604         /* we got an election packet - update the timeout for the election */
1605         talloc_free(rec->election_timeout);
1606         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1607                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1608                                                 ctdb_election_timeout, rec);
1609
1610         mem_ctx = talloc_new(ctdb);
1611
1612         /* someone called an election. check their election data
1613            and if we disagree and we would rather be the elected node, 
1614            send a new election message to all other nodes
1615          */
1616         if (ctdb_election_win(rec, em)) {
1617                 if (!rec->send_election_te) {
1618                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1619                                                                 timeval_current_ofs(0, 500000),
1620                                                                 election_send_request, rec);
1621                 }
1622                 talloc_free(mem_ctx);
1623                 /*unban_all_nodes(ctdb);*/
1624                 return;
1625         }
1626         
1627         /* we didn't win */
1628         talloc_free(rec->send_election_te);
1629         rec->send_election_te = NULL;
1630
1631         /* release the recmaster lock */
1632         if (em->pnn != ctdb->pnn &&
1633             ctdb->recovery_lock_fd != -1) {
1634                 close(ctdb->recovery_lock_fd);
1635                 ctdb->recovery_lock_fd = -1;
1636                 unban_all_nodes(ctdb);
1637         }
1638
1639         /* ok, let that guy become recmaster then */
1640         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1641         if (ret != 0) {
1642                 DEBUG(0, (__location__ " failed to send recmaster election request"));
1643                 talloc_free(mem_ctx);
1644                 return;
1645         }
1646
1647         /* release any bans */
1648         rec->last_culprit = (uint32_t)-1;
1649         talloc_free(rec->banned_nodes);
1650         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1651         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1652
1653         talloc_free(mem_ctx);
1654         return;
1655 }
1656
1657
1658 /*
1659   force the start of the election process
1660  */
1661 static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn, 
1662                            struct ctdb_node_map *nodemap)
1663 {
1664         int ret;
1665         struct ctdb_context *ctdb = rec->ctdb;
1666
1667         /* set all nodes to recovery mode to stop all internode traffic */
1668         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1669         if (ret!=0) {
1670                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
1671                 return;
1672         }
1673
1674         talloc_free(rec->election_timeout);
1675         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1676                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1677                                                 ctdb_election_timeout, rec);
1678
1679         ret = send_election_request(rec, pnn);
1680         if (ret!=0) {
1681                 DEBUG(0, (__location__ " failed to initiate recmaster election"));
1682                 return;
1683         }
1684
1685         /* wait for a few seconds to collect all responses */
1686         ctdb_wait_election(rec);
1687 }
1688
1689
1690
1691 /*
1692   handler for when a node changes its flags
1693 */
1694 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1695                             TDB_DATA data, void *private_data)
1696 {
1697         int ret;
1698         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1699         struct ctdb_node_map *nodemap=NULL;
1700         TALLOC_CTX *tmp_ctx;
1701         uint32_t changed_flags;
1702         int i;
1703         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1704
1705         if (data.dsize != sizeof(*c)) {
1706                 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1707                 return;
1708         }
1709
1710         tmp_ctx = talloc_new(ctdb);
1711         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1712
1713         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1714         if (ret != 0) {
1715                 DEBUG(0,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1716                 talloc_free(tmp_ctx);
1717                 return;         
1718         }
1719
1720
1721         for (i=0;i<nodemap->num;i++) {
1722                 if (nodemap->nodes[i].pnn == c->pnn) break;
1723         }
1724
1725         if (i == nodemap->num) {
1726                 DEBUG(0,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1727                 talloc_free(tmp_ctx);
1728                 return;
1729         }
1730
1731         changed_flags = c->old_flags ^ c->new_flags;
1732
1733         /* Dont let messages from remote nodes change the DISCONNECTED flag. 
1734            This flag is handled locally based on whether the local node
1735            can communicate with the node or not.
1736         */
1737         c->new_flags &= ~NODE_FLAGS_DISCONNECTED;
1738         if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) {
1739                 c->new_flags |= NODE_FLAGS_DISCONNECTED;
1740         }
1741
1742         if (nodemap->nodes[i].flags != c->new_flags) {
1743                 DEBUG(0,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1744         }
1745
1746         nodemap->nodes[i].flags = c->new_flags;
1747
1748         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1749                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1750
1751         if (ret == 0) {
1752                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1753                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1754         }
1755         
1756         if (ret == 0 &&
1757             ctdb->recovery_master == ctdb->pnn &&
1758             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
1759             ctdb->vnn) {
1760                 /* Only do the takeover run if the perm disabled or unhealthy
1761                    flags changed since these will cause an ip failover but not
1762                    a recovery.
1763                    If the node became disconnected or banned this will also
1764                    lead to an ip address failover but that is handled 
1765                    during recovery
1766                 */
1767                 if (changed_flags & NODE_FLAGS_DISABLED) {
1768                         rec->need_takeover_run = true;
1769                 }
1770         }
1771
1772         talloc_free(tmp_ctx);
1773 }
1774
1775
1776
1777 struct verify_recmode_normal_data {
1778         uint32_t count;
1779         enum monitor_result status;
1780 };
1781
1782 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1783 {
1784         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
1785
1786
1787         /* one more node has responded with recmode data*/
1788         rmdata->count--;
1789
1790         /* if we failed to get the recmode, then return an error and let
1791            the main loop try again.
1792         */
1793         if (state->state != CTDB_CONTROL_DONE) {
1794                 if (rmdata->status == MONITOR_OK) {
1795                         rmdata->status = MONITOR_FAILED;
1796                 }
1797                 return;
1798         }
1799
1800         /* if we got a response, then the recmode will be stored in the
1801            status field
1802         */
1803         if (state->status != CTDB_RECOVERY_NORMAL) {
1804                 DEBUG(0, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
1805                 rmdata->status = MONITOR_RECOVERY_NEEDED;
1806         }
1807
1808         return;
1809 }
1810
1811
1812 /* verify that all nodes are in normal recovery mode */
1813 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
1814 {
1815         struct verify_recmode_normal_data *rmdata;
1816         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1817         struct ctdb_client_control_state *state;
1818         enum monitor_result status;
1819         int j;
1820         
1821         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
1822         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1823         rmdata->count  = 0;
1824         rmdata->status = MONITOR_OK;
1825
1826         /* loop over all active nodes and send an async getrecmode call to 
1827            them*/
1828         for (j=0; j<nodemap->num; j++) {
1829                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1830                         continue;
1831                 }
1832                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
1833                                         CONTROL_TIMEOUT(), 
1834                                         nodemap->nodes[j].pnn);
1835                 if (state == NULL) {
1836                         /* we failed to send the control, treat this as 
1837                            an error and try again next iteration
1838                         */                      
1839                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
1840                         talloc_free(mem_ctx);
1841                         return MONITOR_FAILED;
1842                 }
1843
1844                 /* set up the callback functions */
1845                 state->async.fn = verify_recmode_normal_callback;
1846                 state->async.private_data = rmdata;
1847
1848                 /* one more control to wait for to complete */
1849                 rmdata->count++;
1850         }
1851
1852
1853         /* now wait for up to the maximum number of seconds allowed
1854            or until all nodes we expect a response from has replied
1855         */
1856         while (rmdata->count > 0) {
1857                 event_loop_once(ctdb->ev);
1858         }
1859
1860         status = rmdata->status;
1861         talloc_free(mem_ctx);
1862         return status;
1863 }
1864
1865
1866 struct verify_recmaster_data {
1867         uint32_t count;
1868         uint32_t pnn;
1869         enum monitor_result status;
1870 };
1871
1872 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
1873 {
1874         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
1875
1876
1877         /* one more node has responded with recmaster data*/
1878         rmdata->count--;
1879
1880         /* if we failed to get the recmaster, then return an error and let
1881            the main loop try again.
1882         */
1883         if (state->state != CTDB_CONTROL_DONE) {
1884                 if (rmdata->status == MONITOR_OK) {
1885                         rmdata->status = MONITOR_FAILED;
1886                 }
1887                 return;
1888         }
1889
1890         /* if we got a response, then the recmaster will be stored in the
1891            status field
1892         */
1893         if (state->status != rmdata->pnn) {
1894                 DEBUG(0,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
1895                 rmdata->status = MONITOR_ELECTION_NEEDED;
1896         }
1897
1898         return;
1899 }
1900
1901
1902 /* verify that all nodes agree that we are the recmaster */
1903 static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
1904 {
1905         struct verify_recmaster_data *rmdata;
1906         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1907         struct ctdb_client_control_state *state;
1908         enum monitor_result status;
1909         int j;
1910         
1911         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
1912         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1913         rmdata->count  = 0;
1914         rmdata->pnn    = pnn;
1915         rmdata->status = MONITOR_OK;
1916
1917         /* loop over all active nodes and send an async getrecmaster call to 
1918            them*/
1919         for (j=0; j<nodemap->num; j++) {
1920                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1921                         continue;
1922                 }
1923                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
1924                                         CONTROL_TIMEOUT(),
1925                                         nodemap->nodes[j].pnn);
1926                 if (state == NULL) {
1927                         /* we failed to send the control, treat this as 
1928                            an error and try again next iteration
1929                         */                      
1930                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
1931                         talloc_free(mem_ctx);
1932                         return MONITOR_FAILED;
1933                 }
1934
1935                 /* set up the callback functions */
1936                 state->async.fn = verify_recmaster_callback;
1937                 state->async.private_data = rmdata;
1938
1939                 /* one more control to wait for to complete */
1940                 rmdata->count++;
1941         }
1942
1943
1944         /* now wait for up to the maximum number of seconds allowed
1945            or until all nodes we expect a response from has replied
1946         */
1947         while (rmdata->count > 0) {
1948                 event_loop_once(ctdb->ev);
1949         }
1950
1951         status = rmdata->status;
1952         talloc_free(mem_ctx);
1953         return status;
1954 }
1955
1956
1957 /*
1958   the main monitoring loop
1959  */
1960 static void monitor_cluster(struct ctdb_context *ctdb)
1961 {
1962         uint32_t pnn, num_active, recmaster;
1963         TALLOC_CTX *mem_ctx=NULL;
1964         struct ctdb_node_map *nodemap=NULL;
1965         struct ctdb_node_map *remote_nodemap=NULL;
1966         struct ctdb_vnn_map *vnnmap=NULL;
1967         struct ctdb_vnn_map *remote_vnnmap=NULL;
1968         int i, j, ret;
1969         struct ctdb_recoverd *rec;
1970         struct ctdb_all_public_ips *ips;
1971         char c;
1972
1973         DEBUG(0,("monitor_cluster starting\n"));
1974
1975         rec = talloc_zero(ctdb, struct ctdb_recoverd);
1976         CTDB_NO_MEMORY_FATAL(ctdb, rec);
1977
1978         rec->ctdb = ctdb;
1979         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1980         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1981
1982         rec->priority_time = timeval_current();
1983
1984         /* register a message port for recovery elections */
1985         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
1986
1987         /* and one for when nodes are disabled/enabled */
1988         ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
1989
1990         /* and one for when nodes are banned */
1991         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
1992
1993         /* and one for when nodes are unbanned */
1994         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
1995
1996         /* register a message port for vacuum fetch */
1997         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
1998         
1999 again:
2000         if (mem_ctx) {
2001                 talloc_free(mem_ctx);
2002                 mem_ctx = NULL;
2003         }
2004         mem_ctx = talloc_new(ctdb);
2005         if (!mem_ctx) {
2006                 DEBUG(0,("Failed to create temporary context\n"));
2007                 exit(-1);
2008         }
2009
2010         /* we only check for recovery once every second */
2011         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2012
2013         /* verify that the main daemon is still running */
2014         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2015                 DEBUG(0,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2016                 exit(-1);
2017         }
2018
2019         if (rec->election_timeout) {
2020                 /* an election is in progress */
2021                 goto again;
2022         }
2023
2024
2025         /* We must check if we need to ban a node here but we want to do this
2026            as early as possible so we dont wait until we have pulled the node
2027            map from the local node. thats why we have the hardcoded value 20
2028         */
2029         if (rec->culprit_counter > 20) {
2030                 DEBUG(0,("Node %u has caused %u failures in %.0f seconds - banning it for %u seconds\n",
2031                          rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
2032                          ctdb->tunable.recovery_ban_period));
2033                 ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
2034         }
2035
2036         /* get relevant tunables */
2037         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2038         if (ret != 0) {
2039                 DEBUG(0,("Failed to get tunables - retrying\n"));
2040                 goto again;
2041         }
2042
2043         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2044         if (pnn == (uint32_t)-1) {
2045                 DEBUG(0,("Failed to get local pnn - retrying\n"));
2046                 goto again;
2047         }
2048
2049         /* get the vnnmap */
2050         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2051         if (ret != 0) {
2052                 DEBUG(0, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2053                 goto again;
2054         }
2055
2056
2057         /* get number of nodes */
2058         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &nodemap);
2059         if (ret != 0) {
2060                 DEBUG(0, (__location__ " Unable to get nodemap from node %u\n", pnn));
2061                 goto again;
2062         }
2063
2064         /* check which node is the recovery master */
2065         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &recmaster);
2066         if (ret != 0) {
2067                 DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", pnn));
2068                 goto again;
2069         }
2070
2071         if (recmaster == (uint32_t)-1) {
2072                 DEBUG(0,(__location__ " Initial recovery master set - forcing election\n"));
2073                 force_election(rec, mem_ctx, pnn, nodemap);
2074                 goto again;
2075         }
2076         
2077         /* check that we (recovery daemon) and the local ctdb daemon
2078            agrees on whether we are banned or not
2079         */
2080         if (nodemap->nodes[pnn].flags & NODE_FLAGS_BANNED) {
2081                 if (rec->banned_nodes[pnn] == NULL) {
2082                         if (recmaster == pnn) {
2083                                 DEBUG(0,("Local ctdb daemon on recmaster thinks this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2084
2085                                 ctdb_unban_node(rec, pnn);
2086                         } else {
2087                                 DEBUG(0,("Local ctdb daemon on non-recmaster thinks this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2088                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2089                                 ctdb_set_culprit(rec, pnn);
2090                         }
2091                         goto again;
2092                 }
2093         } else {
2094                 if (rec->banned_nodes[pnn] != NULL) {
2095                         if (recmaster == pnn) {
2096                                 DEBUG(0,("Local ctdb daemon on recmaster does not think this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2097
2098                                 ctdb_unban_node(rec, pnn);
2099                         } else {
2100                                 DEBUG(0,("Local ctdb daemon on non-recmaster does not think this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2101
2102                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2103                                 ctdb_set_culprit(rec, pnn);
2104                         }
2105                         goto again;
2106                 }
2107         }
2108
2109         /* remember our own node flags */
2110         rec->node_flags = nodemap->nodes[pnn].flags;
2111
2112         /* count how many active nodes there are */
2113         num_active = 0;
2114         for (i=0; i<nodemap->num; i++) {
2115                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2116                         num_active++;
2117                 }
2118         }
2119
2120
2121         /* verify that the recmaster node is still active */
2122         for (j=0; j<nodemap->num; j++) {
2123                 if (nodemap->nodes[j].pnn==recmaster) {
2124                         break;
2125                 }
2126         }
2127
2128         if (j == nodemap->num) {
2129                 DEBUG(0, ("Recmaster node %u not in list. Force reelection\n", recmaster));
2130                 force_election(rec, mem_ctx, pnn, nodemap);
2131                 goto again;
2132         }
2133
2134         /* if recovery master is disconnected we must elect a new recmaster */
2135         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2136                 DEBUG(0, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2137                 force_election(rec, mem_ctx, pnn, nodemap);
2138                 goto again;
2139         }
2140
2141         /* grap the nodemap from the recovery master to check if it is banned */
2142         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2143                                    mem_ctx, &remote_nodemap);
2144         if (ret != 0) {
2145                 DEBUG(0, (__location__ " Unable to get nodemap from recovery master %u\n", 
2146                           nodemap->nodes[j].pnn));
2147                 goto again;
2148         }
2149
2150
2151         if (remote_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2152                 DEBUG(0, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2153                 force_election(rec, mem_ctx, pnn, nodemap);
2154                 goto again;
2155         }
2156
2157         /* verify that the public ip address allocation is consistent */
2158         if (ctdb->vnn != NULL) {
2159                 ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2160                 if (ret != 0) {
2161                         DEBUG(0, ("Unable to get public ips from node %u\n", i));
2162                         goto again;
2163                 }
2164                 for (j=0; j<ips->num; j++) {
2165                         /* verify that we have the ip addresses we should have
2166                            and we dont have ones we shouldnt have.
2167                            if we find an inconsistency we set recmode to
2168                            active on the local node and wait for the recmaster
2169                            to do a full blown recovery
2170                         */
2171                         if (ips->ips[j].pnn == pnn) {
2172                                 if (!ctdb_sys_have_ip(ips->ips[j].sin)) {
2173                                         DEBUG(0,("Public address '%s' is missing and we should serve this ip\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
2174                                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2175                                         if (ret != 0) {
2176                                                 DEBUG(0,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2177                                                 goto again;
2178                                         }
2179                                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2180                                         if (ret != 0) {
2181                                                 DEBUG(0,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2182                                                 goto again;
2183                                         }
2184                                 }
2185                         } else {
2186                                 if (ctdb_sys_have_ip(ips->ips[j].sin)) {
2187                                         DEBUG(0,("We are still serving a public address '%s' that we should not be serving.\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
2188                                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2189                                         if (ret != 0) {
2190                                                 DEBUG(0,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2191                                                 goto again;
2192                                         }
2193                                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2194                                         if (ret != 0) {
2195                                                 DEBUG(0,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2196                                                 goto again;
2197                                         }
2198                                 }
2199                         }
2200                 }
2201         }
2202
2203         /* if we are not the recmaster then we do not need to check
2204            if recovery is needed
2205          */
2206         if (pnn != recmaster) {
2207                 goto again;
2208         }
2209
2210
2211         /* ensure our local copies of flags are right */
2212         ret = update_local_flags(rec, nodemap);
2213         if (ret == MONITOR_ELECTION_NEEDED) {
2214                 DEBUG(0,("update_local_flags() called for a re-election.\n"));
2215                 force_election(rec, mem_ctx, pnn, nodemap);
2216                 goto again;
2217         }
2218         if (ret != MONITOR_OK) {
2219                 DEBUG(0,("Unable to update local flags\n"));
2220                 goto again;
2221         }
2222
2223         /* update the list of public ips that a node can handle for
2224            all connected nodes
2225         */
2226         for (j=0; j<nodemap->num; j++) {
2227                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2228                         continue;
2229                 }
2230                 /* release any existing data */
2231                 if (ctdb->nodes[j]->public_ips) {
2232                         talloc_free(ctdb->nodes[j]->public_ips);
2233                         ctdb->nodes[j]->public_ips = NULL;
2234                 }
2235                 /* grab a new shiny list of public ips from the node */
2236                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
2237                         ctdb->nodes[j]->pnn, 
2238                         ctdb->nodes,
2239                         &ctdb->nodes[j]->public_ips)) {
2240                         DEBUG(0,("Failed to read public ips from node : %u\n", 
2241                                 ctdb->nodes[j]->pnn));
2242                         goto again;
2243                 }
2244         }
2245
2246
2247         /* verify that all active nodes agree that we are the recmaster */
2248         switch (verify_recmaster(ctdb, nodemap, pnn)) {
2249         case MONITOR_RECOVERY_NEEDED:
2250                 /* can not happen */
2251                 goto again;
2252         case MONITOR_ELECTION_NEEDED:
2253                 force_election(rec, mem_ctx, pnn, nodemap);
2254                 goto again;
2255         case MONITOR_OK:
2256                 break;
2257         case MONITOR_FAILED:
2258                 goto again;
2259         }
2260
2261
2262         if (rec->need_recovery) {
2263                 /* a previous recovery didn't finish */
2264                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
2265                 goto again;             
2266         }
2267
2268         /* verify that all active nodes are in normal mode 
2269            and not in recovery mode 
2270          */
2271         switch (verify_recmode(ctdb, nodemap)) {
2272         case MONITOR_RECOVERY_NEEDED:
2273                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
2274                 goto again;
2275         case MONITOR_FAILED:
2276                 goto again;
2277         case MONITOR_ELECTION_NEEDED:
2278                 /* can not happen */
2279         case MONITOR_OK:
2280                 break;
2281         }
2282
2283
2284         /* we should have the reclock - check its not stale */
2285         if (ctdb->recovery_lock_fd == -1) {
2286                 DEBUG(0,("recovery master doesn't have the recovery lock\n"));
2287                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
2288                 goto again;
2289         }
2290
2291         if (read(ctdb->recovery_lock_fd, &c, 1) == -1) {
2292                 DEBUG(0,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2293                 close(ctdb->recovery_lock_fd);
2294                 ctdb->recovery_lock_fd = -1;
2295                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
2296                 goto again;
2297         }
2298
2299         /* get the nodemap for all active remote nodes and verify
2300            they are the same as for this node
2301          */
2302         for (j=0; j<nodemap->num; j++) {
2303                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2304                         continue;
2305                 }
2306                 if (nodemap->nodes[j].pnn == pnn) {
2307                         continue;
2308                 }
2309
2310                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2311                                            mem_ctx, &remote_nodemap);
2312                 if (ret != 0) {
2313                         DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
2314                                   nodemap->nodes[j].pnn));
2315                         goto again;
2316                 }
2317
2318                 /* if the nodes disagree on how many nodes there are
2319                    then this is a good reason to try recovery
2320                  */
2321                 if (remote_nodemap->num != nodemap->num) {
2322                         DEBUG(0, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2323                                   nodemap->nodes[j].pnn, remote_nodemap->num, nodemap->num));
2324                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
2325                         goto again;
2326                 }
2327
2328                 /* if the nodes disagree on which nodes exist and are
2329                    active, then that is also a good reason to do recovery
2330                  */
2331                 for (i=0;i<nodemap->num;i++) {
2332                         if (remote_nodemap->nodes[i].pnn != nodemap->nodes[i].pnn) {
2333                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2334                                           nodemap->nodes[j].pnn, i, 
2335                                           remote_nodemap->nodes[i].pnn, nodemap->nodes[i].pnn));
2336                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
2337                                             vnnmap, nodemap->nodes[j].pnn);
2338                                 goto again;
2339                         }
2340                         if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
2341                             (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2342                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
2343                                           nodemap->nodes[j].pnn, i,
2344                                           remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
2345                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
2346                                             vnnmap, nodemap->nodes[j].pnn);
2347                                 goto again;
2348                         }
2349                 }
2350
2351         }
2352
2353
2354         /* there better be the same number of lmasters in the vnn map
2355            as there are active nodes or we will have to do a recovery
2356          */
2357         if (vnnmap->size != num_active) {
2358                 DEBUG(0, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
2359                           vnnmap->size, num_active));
2360                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
2361                 goto again;
2362         }
2363
2364         /* verify that all active nodes in the nodemap also exist in 
2365            the vnnmap.
2366          */
2367         for (j=0; j<nodemap->num; j++) {
2368                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2369                         continue;
2370                 }
2371                 if (nodemap->nodes[j].pnn == pnn) {
2372                         continue;
2373                 }
2374
2375                 for (i=0; i<vnnmap->size; i++) {
2376                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2377                                 break;
2378                         }
2379                 }
2380                 if (i == vnnmap->size) {
2381                         DEBUG(0, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
2382                                   nodemap->nodes[j].pnn));
2383                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
2384                         goto again;
2385                 }
2386         }
2387
2388         
2389         /* verify that all other nodes have the same vnnmap
2390            and are from the same generation
2391          */
2392         for (j=0; j<nodemap->num; j++) {
2393                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2394                         continue;
2395                 }
2396                 if (nodemap->nodes[j].pnn == pnn) {
2397                         continue;
2398                 }
2399
2400                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2401                                           mem_ctx, &remote_vnnmap);
2402                 if (ret != 0) {
2403                         DEBUG(0, (__location__ " Unable to get vnnmap from remote node %u\n", 
2404                                   nodemap->nodes[j].pnn));
2405                         goto again;
2406                 }
2407
2408                 /* verify the vnnmap generation is the same */
2409                 if (vnnmap->generation != remote_vnnmap->generation) {
2410                         DEBUG(0, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
2411                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
2412                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
2413                         goto again;
2414                 }
2415
2416                 /* verify the vnnmap size is the same */
2417                 if (vnnmap->size != remote_vnnmap->size) {
2418                         DEBUG(0, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
2419                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
2420                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
2421                         goto again;
2422                 }
2423
2424                 /* verify the vnnmap is the same */
2425                 for (i=0;i<vnnmap->size;i++) {
2426                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
2427                                 DEBUG(0, (__location__ " Remote node %u has different vnnmap.\n", 
2428                                           nodemap->nodes[j].pnn));
2429                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
2430                                             vnnmap, nodemap->nodes[j].pnn);
2431                                 goto again;
2432                         }
2433                 }
2434         }
2435
2436         /* we might need to change who has what IP assigned */
2437         if (rec->need_takeover_run) {
2438                 rec->need_takeover_run = false;
2439                 ret = ctdb_takeover_run(ctdb, nodemap);
2440                 if (ret != 0) {
2441                         DEBUG(0, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
2442                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
2443                                     vnnmap, ctdb->pnn);
2444                 }
2445         }
2446
2447         goto again;
2448
2449 }
2450
2451 /*
2452   event handler for when the main ctdbd dies
2453  */
2454 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
2455                                  uint16_t flags, void *private_data)
2456 {
2457         DEBUG(0,("recovery daemon parent died - exiting\n"));
2458         _exit(1);
2459 }
2460
2461 /*
2462   startup the recovery daemon as a child of the main ctdb daemon
2463  */
2464 int ctdb_start_recoverd(struct ctdb_context *ctdb)
2465 {
2466         int ret;
2467         int fd[2];
2468
2469         if (pipe(fd) != 0) {
2470                 return -1;
2471         }
2472
2473         ctdb->ctdbd_pid = getpid();
2474
2475         ctdb->recoverd_pid = fork();
2476         if (ctdb->recoverd_pid == -1) {
2477                 return -1;
2478         }
2479         
2480         if (ctdb->recoverd_pid != 0) {
2481                 close(fd[0]);
2482                 return 0;
2483         }
2484
2485         close(fd[1]);
2486
2487         /* shutdown the transport */
2488         ctdb->methods->shutdown(ctdb);
2489
2490         /* get a new event context */
2491         talloc_free(ctdb->ev);
2492         ctdb->ev = event_context_init(ctdb);
2493
2494         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
2495                      ctdb_recoverd_parent, &fd[0]);     
2496
2497         close(ctdb->daemon.sd);
2498         ctdb->daemon.sd = -1;
2499
2500         srandom(getpid() ^ time(NULL));
2501
2502         /* initialise ctdb */
2503         ret = ctdb_socket_connect(ctdb);
2504         if (ret != 0) {
2505                 DEBUG(0, (__location__ " Failed to init ctdb\n"));
2506                 exit(1);
2507         }
2508
2509         monitor_cluster(ctdb);
2510
2511         DEBUG(0,("ERROR: ctdb_recoverd finished!?\n"));
2512         return -1;
2513 }
2514
2515 /*
2516   shutdown the recovery daemon
2517  */
2518 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
2519 {
2520         if (ctdb->recoverd_pid == 0) {
2521                 return;
2522         }
2523
2524         DEBUG(0,("Shutting down recovery daemon\n"));
2525         kill(ctdb->recoverd_pid, SIGTERM);
2526 }