fixed a warning
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30
31
32 struct ban_state {
33         struct ctdb_recoverd *rec;
34         uint32_t banned_node;
35 };
36
37 /*
38   private state of recovery daemon
39  */
40 struct ctdb_recoverd {
41         struct ctdb_context *ctdb;
42         uint32_t last_culprit;
43         uint32_t culprit_counter;
44         struct timeval first_recover_time;
45         struct ban_state **banned_nodes;
46         struct timeval priority_time;
47         bool need_takeover_run;
48         bool need_recovery;
49         uint32_t node_flags;
50         struct timed_event *send_election_te;
51         struct timed_event *election_timeout;
52 };
53
54 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
55 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
56
57 /*
58   unban a node
59  */
60 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
61 {
62         struct ctdb_context *ctdb = rec->ctdb;
63
64         DEBUG(0,("Unbanning node %u\n", pnn));
65
66         if (!ctdb_validate_pnn(ctdb, pnn)) {
67                 DEBUG(0,("Bad pnn %u in ctdb_unban_node\n", pnn));
68                 return;
69         }
70
71         /* If we are unbanning a different node then just pass the ban info on */
72         if (pnn != ctdb->pnn) {
73                 TDB_DATA data;
74                 int ret;
75                 
76                 DEBUG(0,("Unanning remote node %u. Passing the ban request on to the remote node.\n", pnn));
77
78                 data.dptr = (uint8_t *)&pnn;
79                 data.dsize = sizeof(uint32_t);
80
81                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_UNBAN_NODE, data);
82                 if (ret != 0) {
83                         DEBUG(0,("Failed to unban node %u\n", pnn));
84                         return;
85                 }
86
87                 return;
88         }
89
90         /* make sure we remember we are no longer banned in case 
91            there is an election */
92         rec->node_flags &= ~NODE_FLAGS_BANNED;
93
94         DEBUG(0,("Clearing ban flag on node %u\n", pnn));
95         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
96
97         if (rec->banned_nodes[pnn] == NULL) {
98                 DEBUG(0,("No ban recorded for this node. ctdb_unban_node() request ignored\n"));
99                 return;
100         }
101
102         talloc_free(rec->banned_nodes[pnn]);
103         rec->banned_nodes[pnn] = NULL;
104 }
105
106
107 /*
108   called when a ban has timed out
109  */
110 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
111 {
112         struct ban_state *state = talloc_get_type(p, struct ban_state);
113         struct ctdb_recoverd *rec = state->rec;
114         uint32_t pnn = state->banned_node;
115
116         DEBUG(0,("Ban timeout. Node %u is now unbanned\n", pnn));
117         ctdb_unban_node(rec, pnn);
118 }
119
120 /*
121   ban a node for a period of time
122  */
123 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
124 {
125         struct ctdb_context *ctdb = rec->ctdb;
126
127         DEBUG(0,("Banning node %u for %u seconds\n", pnn, ban_time));
128
129         if (!ctdb_validate_pnn(ctdb, pnn)) {
130                 DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
131                 return;
132         }
133
134         if (0 == ctdb->tunable.enable_bans) {
135                 DEBUG(0,("Bans are disabled - ignoring ban of node %u\n", pnn));
136                 return;
137         }
138
139         /* If we are banning a different node then just pass the ban info on */
140         if (pnn != ctdb->pnn) {
141                 struct ctdb_ban_info b;
142                 TDB_DATA data;
143                 int ret;
144                 
145                 DEBUG(0,("Banning remote node %u for %u seconds. Passing the ban request on to the remote node.\n", pnn, ban_time));
146
147                 b.pnn = pnn;
148                 b.ban_time = ban_time;
149
150                 data.dptr = (uint8_t *)&b;
151                 data.dsize = sizeof(b);
152
153                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_BAN_NODE, data);
154                 if (ret != 0) {
155                         DEBUG(0,("Failed to ban node %u\n", pnn));
156                         return;
157                 }
158
159                 return;
160         }
161
162         DEBUG(0,("self ban - lowering our election priority\n"));
163         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
164
165         /* banning ourselves - lower our election priority */
166         rec->priority_time = timeval_current();
167
168         /* make sure we remember we are banned in case there is an 
169            election */
170         rec->node_flags |= NODE_FLAGS_BANNED;
171
172         if (rec->banned_nodes[pnn] != NULL) {
173                 DEBUG(0,("Re-banning an already banned node. Remove previous ban and set a new ban.\n"));               
174                 talloc_free(rec->banned_nodes[pnn]);
175                 rec->banned_nodes[pnn] = NULL;
176         }
177
178         rec->banned_nodes[pnn] = talloc(rec->banned_nodes, struct ban_state);
179         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
180
181         rec->banned_nodes[pnn]->rec = rec;
182         rec->banned_nodes[pnn]->banned_node = pnn;
183
184         if (ban_time != 0) {
185                 event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
186                                 timeval_current_ofs(ban_time, 0),
187                                 ctdb_ban_timeout, rec->banned_nodes[pnn]);
188         }
189 }
190
191 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
192
193
194 struct freeze_node_data {
195         uint32_t count;
196         enum monitor_result status;
197 };
198
199
200 static void freeze_node_callback(struct ctdb_client_control_state *state)
201 {
202         struct freeze_node_data *fndata = talloc_get_type(state->async.private_data, struct freeze_node_data);
203
204
205         /* one more node has responded to our freeze node*/
206         fndata->count--;
207
208         /* if we failed to freeze the node, we must trigger another recovery */
209         if ( (state->state != CTDB_CONTROL_DONE) || (state->status != 0) ) {
210                 DEBUG(0, (__location__ " Failed to freeze node:%u. recovery failed\n", state->c->hdr.destnode));
211                 fndata->status = MONITOR_RECOVERY_NEEDED;
212         }
213
214         return;
215 }
216
217
218
219 /* freeze all nodes */
220 static enum monitor_result freeze_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
221 {
222         struct freeze_node_data *fndata;
223         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
224         struct ctdb_client_control_state *state;
225         enum monitor_result status;
226         int j;
227         
228         fndata = talloc(mem_ctx, struct freeze_node_data);
229         CTDB_NO_MEMORY_FATAL(ctdb, fndata);
230         fndata->count  = 0;
231         fndata->status = MONITOR_OK;
232
233         /* loop over all active nodes and send an async freeze call to 
234            them*/
235         for (j=0; j<nodemap->num; j++) {
236                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
237                         continue;
238                 }
239                 state = ctdb_ctrl_freeze_send(ctdb, mem_ctx, 
240                                         CONTROL_TIMEOUT(), 
241                                         nodemap->nodes[j].pnn);
242                 if (state == NULL) {
243                         /* we failed to send the control, treat this as 
244                            an error and try again next iteration
245                         */                      
246                         DEBUG(0,("Failed to call ctdb_ctrl_freeze_send during recovery\n"));
247                         talloc_free(mem_ctx);
248                         return MONITOR_RECOVERY_NEEDED;
249                 }
250
251                 /* set up the callback functions */
252                 state->async.fn = freeze_node_callback;
253                 state->async.private_data = fndata;
254
255                 /* one more control to wait for to complete */
256                 fndata->count++;
257         }
258
259
260         /* now wait for up to the maximum number of seconds allowed
261            or until all nodes we expect a response from has replied
262         */
263         while (fndata->count > 0) {
264                 event_loop_once(ctdb->ev);
265         }
266
267         status = fndata->status;
268         talloc_free(mem_ctx);
269         return status;
270 }
271
272
273 /*
274   change recovery mode on all nodes
275  */
276 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
277 {
278         int j, ret;
279
280         /* freeze all nodes */
281         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
282                 ret = freeze_all_nodes(ctdb, nodemap);
283                 if (ret != MONITOR_OK) {
284                         DEBUG(0, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
285                         return -1;
286                 }
287         }
288
289
290         /* set recovery mode to active on all nodes */
291         for (j=0; j<nodemap->num; j++) {
292                 /* dont change it for nodes that are unavailable */
293                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
294                         continue;
295                 }
296
297                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, rec_mode);
298                 if (ret != 0) {
299                         DEBUG(0, (__location__ " Unable to set recmode on node %u\n", nodemap->nodes[j].pnn));
300                         return -1;
301                 }
302
303                 if (rec_mode == CTDB_RECOVERY_NORMAL) {
304                         ret = ctdb_ctrl_thaw(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn);
305                         if (ret != 0) {
306                                 DEBUG(0, (__location__ " Unable to thaw node %u\n", nodemap->nodes[j].pnn));
307                                 return -1;
308                         }
309                 }
310         }
311
312         return 0;
313 }
314
315 /*
316   change recovery master on all node
317  */
318 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
319 {
320         int j, ret;
321
322         /* set recovery master to pnn on all nodes */
323         for (j=0; j<nodemap->num; j++) {
324                 /* dont change it for nodes that are unavailable */
325                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
326                         continue;
327                 }
328
329                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, pnn);
330                 if (ret != 0) {
331                         DEBUG(0, (__location__ " Unable to set recmaster on node %u\n", nodemap->nodes[j].pnn));
332                         return -1;
333                 }
334         }
335
336         return 0;
337 }
338
339
340 /*
341   ensure all other nodes have attached to any databases that we have
342  */
343 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
344                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
345 {
346         int i, j, db, ret;
347         struct ctdb_dbid_map *remote_dbmap;
348
349         /* verify that all other nodes have all our databases */
350         for (j=0; j<nodemap->num; j++) {
351                 /* we dont need to ourself ourselves */
352                 if (nodemap->nodes[j].pnn == pnn) {
353                         continue;
354                 }
355                 /* dont check nodes that are unavailable */
356                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
357                         continue;
358                 }
359
360                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
361                                          mem_ctx, &remote_dbmap);
362                 if (ret != 0) {
363                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
364                         return -1;
365                 }
366
367                 /* step through all local databases */
368                 for (db=0; db<dbmap->num;db++) {
369                         const char *name;
370
371
372                         for (i=0;i<remote_dbmap->num;i++) {
373                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
374                                         break;
375                                 }
376                         }
377                         /* the remote node already have this database */
378                         if (i!=remote_dbmap->num) {
379                                 continue;
380                         }
381                         /* ok so we need to create this database */
382                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
383                                             mem_ctx, &name);
384                         if (ret != 0) {
385                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", pnn));
386                                 return -1;
387                         }
388                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
389                                            mem_ctx, name, dbmap->dbs[db].persistent);
390                         if (ret != 0) {
391                                 DEBUG(0, (__location__ " Unable to create remote db:%s\n", name));
392                                 return -1;
393                         }
394                 }
395         }
396
397         return 0;
398 }
399
400
401 /*
402   ensure we are attached to any databases that anyone else is attached to
403  */
404 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
405                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
406 {
407         int i, j, db, ret;
408         struct ctdb_dbid_map *remote_dbmap;
409
410         /* verify that we have all database any other node has */
411         for (j=0; j<nodemap->num; j++) {
412                 /* we dont need to ourself ourselves */
413                 if (nodemap->nodes[j].pnn == pnn) {
414                         continue;
415                 }
416                 /* dont check nodes that are unavailable */
417                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
418                         continue;
419                 }
420
421                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
422                                          mem_ctx, &remote_dbmap);
423                 if (ret != 0) {
424                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
425                         return -1;
426                 }
427
428                 /* step through all databases on the remote node */
429                 for (db=0; db<remote_dbmap->num;db++) {
430                         const char *name;
431
432                         for (i=0;i<(*dbmap)->num;i++) {
433                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
434                                         break;
435                                 }
436                         }
437                         /* we already have this db locally */
438                         if (i!=(*dbmap)->num) {
439                                 continue;
440                         }
441                         /* ok so we need to create this database and
442                            rebuild dbmap
443                          */
444                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
445                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
446                         if (ret != 0) {
447                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", 
448                                           nodemap->nodes[j].pnn));
449                                 return -1;
450                         }
451                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
452                                            remote_dbmap->dbs[db].persistent);
453                         if (ret != 0) {
454                                 DEBUG(0, (__location__ " Unable to create local db:%s\n", name));
455                                 return -1;
456                         }
457                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
458                         if (ret != 0) {
459                                 DEBUG(0, (__location__ " Unable to reread dbmap on node %u\n", pnn));
460                                 return -1;
461                         }
462                 }
463         }
464
465         return 0;
466 }
467
468
469 /*
470   pull all the remote database contents into ours
471  */
472 static int pull_all_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
473                                      uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
474 {
475         int i, j, ret;
476
477         /* pull all records from all other nodes across onto this node
478            (this merges based on rsn)
479         */
480         for (i=0;i<dbmap->num;i++) {
481                 for (j=0; j<nodemap->num; j++) {
482                         /* we dont need to merge with ourselves */
483                         if (nodemap->nodes[j].pnn == pnn) {
484                                 continue;
485                         }
486                         /* dont merge from nodes that are unavailable */
487                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
488                                 continue;
489                         }
490                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
491                                                pnn, dbmap->dbs[i].dbid, CTDB_LMASTER_ANY, mem_ctx);
492                         if (ret != 0) {
493                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
494                                           nodemap->nodes[j].pnn, pnn));
495                                 return -1;
496                         }
497                 }
498         }
499
500         return 0;
501 }
502
503
504 /*
505   change the dmaster on all databases to point to us
506  */
507 static int update_dmaster_on_our_databases(struct ctdb_context *ctdb, uint32_t pnn, 
508                                            struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
509 {
510         int i, ret;
511
512         /* update dmaster to point to this node for all databases/nodes */
513         for (i=0;i<dbmap->num;i++) {
514                 ret = ctdb_ctrl_setdmaster(ctdb, CONTROL_TIMEOUT(), pnn, 
515                                            ctdb, dbmap->dbs[i].dbid, pnn);
516                 if (ret != 0) {
517                         DEBUG(0, (__location__ " Unable to set dmaster for node %u db:0x%08x\n", 
518                                   pnn, dbmap->dbs[i].dbid));
519                         return -1;
520                 }
521         }
522
523         return 0;
524 }
525
526
527 /*
528   update flags on all active nodes
529  */
530 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
531 {
532         int i;
533         for (i=0;i<nodemap->num;i++) {
534                 struct ctdb_node_flag_change c;
535                 TDB_DATA data;
536
537                 c.pnn = nodemap->nodes[i].pnn;
538                 c.old_flags = nodemap->nodes[i].flags;
539                 c.new_flags = nodemap->nodes[i].flags;
540
541                 data.dptr = (uint8_t *)&c;
542                 data.dsize = sizeof(c);
543
544                 ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
545                                   CTDB_SRVID_NODE_FLAGS_CHANGED, data);
546
547         }
548         return 0;
549 }
550
551 /*
552   vacuum one database
553  */
554 static int vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *nodemap)
555 {
556         uint64_t max_rsn;
557         int ret, i;
558
559         /* find max rsn on our local node for this db */
560         ret = ctdb_ctrl_get_max_rsn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, db_id, &max_rsn);
561         if (ret != 0) {
562                 return -1;
563         }
564
565         /* set rsn on non-empty records to max_rsn+1 */
566         for (i=0;i<nodemap->num;i++) {
567                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
568                         continue;
569                 }
570                 ret = ctdb_ctrl_set_rsn_nonempty(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
571                                                  db_id, max_rsn+1);
572                 if (ret != 0) {
573                         DEBUG(0,(__location__ " Failed to set rsn on node %u to %llu\n",
574                                  nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
575                         return -1;
576                 }
577         }
578
579         /* delete records with rsn < max_rsn+1 on all nodes */
580         for (i=0;i<nodemap->num;i++) {
581                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
582                         continue;
583                 }
584                 ret = ctdb_ctrl_delete_low_rsn(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
585                                                  db_id, max_rsn+1);
586                 if (ret != 0) {
587                         DEBUG(0,(__location__ " Failed to delete records on node %u with rsn below %llu\n",
588                                  nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
589                         return -1;
590                 }
591         }
592
593
594         return 0;
595 }
596
597
598 /*
599   vacuum all attached databases
600  */
601 static int vacuum_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
602                                 struct ctdb_dbid_map *dbmap)
603 {
604         int i;
605
606         /* update dmaster to point to this node for all databases/nodes */
607         for (i=0;i<dbmap->num;i++) {
608                 if (vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap) != 0) {
609                         return -1;
610                 }
611         }
612         return 0;
613 }
614
615
616 /*
617   push out all our database contents to all other nodes
618  */
619 static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
620                                     uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
621 {
622         int i, j, ret;
623
624         /* push all records out to the nodes again */
625         for (i=0;i<dbmap->num;i++) {
626                 for (j=0; j<nodemap->num; j++) {
627                         /* we dont need to push to ourselves */
628                         if (nodemap->nodes[j].pnn == pnn) {
629                                 continue;
630                         }
631                         /* dont push to nodes that are unavailable */
632                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
633                                 continue;
634                         }
635                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), pnn, nodemap->nodes[j].pnn, 
636                                                dbmap->dbs[i].dbid, CTDB_LMASTER_ANY, mem_ctx);
637                         if (ret != 0) {
638                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
639                                           pnn, nodemap->nodes[j].pnn));
640                                 return -1;
641                         }
642                 }
643         }
644
645         return 0;
646 }
647
648
649 /*
650   ensure all nodes have the same vnnmap we do
651  */
652 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
653                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
654 {
655         int j, ret;
656
657         /* push the new vnn map out to all the nodes */
658         for (j=0; j<nodemap->num; j++) {
659                 /* dont push to nodes that are unavailable */
660                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
661                         continue;
662                 }
663
664                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
665                 if (ret != 0) {
666                         DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
667                         return -1;
668                 }
669         }
670
671         return 0;
672 }
673
674
675 /*
676   handler for when the admin bans a node
677 */
678 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
679                         TDB_DATA data, void *private_data)
680 {
681         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
682         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
683         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
684
685         if (data.dsize != sizeof(*b)) {
686                 DEBUG(0,("Bad data in ban_handler\n"));
687                 talloc_free(mem_ctx);
688                 return;
689         }
690
691         if (b->pnn != ctdb->pnn) {
692                 DEBUG(0,("Got a ban request for pnn:%u but our pnn is %u. Ignoring ban request\n", b->pnn, ctdb->pnn));
693                 return;
694         }
695
696         DEBUG(0,("Node %u has been banned for %u seconds\n", 
697                  b->pnn, b->ban_time));
698
699         ctdb_ban_node(rec, b->pnn, b->ban_time);
700         talloc_free(mem_ctx);
701 }
702
703 /*
704   handler for when the admin unbans a node
705 */
706 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
707                           TDB_DATA data, void *private_data)
708 {
709         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
710         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
711         uint32_t pnn;
712
713         if (data.dsize != sizeof(uint32_t)) {
714                 DEBUG(0,("Bad data in unban_handler\n"));
715                 talloc_free(mem_ctx);
716                 return;
717         }
718         pnn = *(uint32_t *)data.dptr;
719
720         if (pnn != ctdb->pnn) {
721                 DEBUG(0,("Got an unban request for pnn:%u but our pnn is %u. Ignoring unban request\n", pnn, ctdb->pnn));
722                 return;
723         }
724
725         DEBUG(0,("Node %u has been unbanned.\n", pnn));
726         ctdb_unban_node(rec, pnn);
727         talloc_free(mem_ctx);
728 }
729
730
731
732 /*
733   called when ctdb_wait_timeout should finish
734  */
735 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
736                               struct timeval yt, void *p)
737 {
738         uint32_t *timed_out = (uint32_t *)p;
739         (*timed_out) = 1;
740 }
741
742 /*
743   wait for a given number of seconds
744  */
745 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
746 {
747         uint32_t timed_out = 0;
748         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
749         while (!timed_out) {
750                 event_loop_once(ctdb->ev);
751         }
752 }
753
754 /*
755   called when an election times out (ends)
756  */
757 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
758                                   struct timeval t, void *p)
759 {
760         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
761         rec->election_timeout = NULL;
762 }
763
764
765 /*
766   wait for an election to finish. It finished election_timeout seconds after
767   the last election packet is received
768  */
769 static void ctdb_wait_election(struct ctdb_recoverd *rec)
770 {
771         struct ctdb_context *ctdb = rec->ctdb;
772         while (rec->election_timeout) {
773                 event_loop_once(ctdb->ev);
774         }
775 }
776
777 /*
778   remember the trouble maker
779  */
780 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
781 {
782         struct ctdb_context *ctdb = rec->ctdb;
783
784         if (rec->last_culprit != culprit ||
785             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
786                 DEBUG(0,("New recovery culprit %u\n", culprit));
787                 /* either a new node is the culprit, or we've decided to forgive them */
788                 rec->last_culprit = culprit;
789                 rec->first_recover_time = timeval_current();
790                 rec->culprit_counter = 0;
791         }
792         rec->culprit_counter++;
793 }
794
795 /*
796   Update our local flags from all remote connected nodes. 
797   This is only run when we are or we belive we are the recovery master
798  */
799 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
800 {
801         int j;
802         struct ctdb_context *ctdb = rec->ctdb;
803         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
804
805         /* get the nodemap for all active remote nodes and verify
806            they are the same as for this node
807          */
808         for (j=0; j<nodemap->num; j++) {
809                 struct ctdb_node_map *remote_nodemap=NULL;
810                 int ret;
811
812                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
813                         continue;
814                 }
815                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
816                         continue;
817                 }
818
819                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
820                                            mem_ctx, &remote_nodemap);
821                 if (ret != 0) {
822                         DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
823                                   nodemap->nodes[j].pnn));
824                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
825                         talloc_free(mem_ctx);
826                         return MONITOR_FAILED;
827                 }
828                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
829                         struct ctdb_node_flag_change c;
830                         TDB_DATA data;
831
832                         /* We should tell our daemon about this so it
833                            updates its flags or else we will log the same 
834                            message again in the next iteration of recovery.
835                            Since we are the recovery master we can just as
836                            well update the flags on all nodes.
837                         */
838                         c.pnn = nodemap->nodes[j].pnn;
839                         c.old_flags = nodemap->nodes[j].flags;
840                         c.new_flags = remote_nodemap->nodes[j].flags;
841
842                         data.dptr = (uint8_t *)&c;
843                         data.dsize = sizeof(c);
844
845                         ctdb_send_message(ctdb, ctdb->pnn,
846                                         CTDB_SRVID_NODE_FLAGS_CHANGED, 
847                                         data);
848
849                         /* Update our local copy of the flags in the recovery
850                            daemon.
851                         */
852                         DEBUG(0,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
853                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
854                                  nodemap->nodes[j].flags));
855                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
856
857                         /* If the BANNED flag has changed for the node
858                            this is a good reason to do a new election.
859                          */
860                         if ((c.old_flags ^ c.new_flags) & NODE_FLAGS_BANNED) {
861                                 DEBUG(0,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
862                                  nodemap->nodes[j].pnn, c.new_flags,
863                                  c.old_flags));
864                                 talloc_free(mem_ctx);
865                                 return MONITOR_ELECTION_NEEDED;
866                         }
867
868                 }
869                 talloc_free(remote_nodemap);
870         }
871         talloc_free(mem_ctx);
872         return MONITOR_OK;
873 }
874
875
876 /* Create a new random generation ip. 
877    The generation id can not be the INVALID_GENERATION id
878 */
879 static uint32_t new_generation(void)
880 {
881         uint32_t generation;
882
883         while (1) {
884                 generation = random();
885
886                 if (generation != INVALID_GENERATION) {
887                         break;
888                 }
889         }
890
891         return generation;
892 }
893
894                 
895 /*
896   we are the recmaster, and recovery is needed - start a recovery run
897  */
898 static int do_recovery(struct ctdb_recoverd *rec, 
899                        TALLOC_CTX *mem_ctx, uint32_t pnn, uint32_t num_active,
900                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
901                        uint32_t culprit)
902 {
903         struct ctdb_context *ctdb = rec->ctdb;
904         int i, j, ret;
905         uint32_t generation;
906         struct ctdb_dbid_map *dbmap;
907
908         DEBUG(0, (__location__ " Starting do_recovery\n"));
909
910         /* if recovery fails, force it again */
911         rec->need_recovery = true;
912
913         ctdb_set_culprit(rec, culprit);
914
915         if (rec->culprit_counter > 2*nodemap->num) {
916                 DEBUG(0,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
917                          culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
918                          ctdb->tunable.recovery_ban_period));
919                 ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
920         }
921
922         if (!ctdb_recovery_lock(ctdb, true)) {
923                 ctdb_set_culprit(rec, pnn);
924                 DEBUG(0,("Unable to get recovery lock - aborting recovery\n"));
925                 return -1;
926         }
927
928         /* set recovery mode to active on all nodes */
929         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
930         if (ret!=0) {
931                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
932                 return -1;
933         }
934
935         DEBUG(0, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
936
937         /* pick a new generation number */
938         generation = new_generation();
939
940         /* change the vnnmap on this node to use the new generation 
941            number but not on any other nodes.
942            this guarantees that if we abort the recovery prematurely
943            for some reason (a node stops responding?)
944            that we can just return immediately and we will reenter
945            recovery shortly again.
946            I.e. we deliberately leave the cluster with an inconsistent
947            generation id to allow us to abort recovery at any stage and
948            just restart it from scratch.
949          */
950         vnnmap->generation = generation;
951         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
952         if (ret != 0) {
953                 DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
954                 return -1;
955         }
956
957         /* get a list of all databases */
958         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
959         if (ret != 0) {
960                 DEBUG(0, (__location__ " Unable to get dbids from node :%u\n", pnn));
961                 return -1;
962         }
963
964
965
966         /* verify that all other nodes have all our databases */
967         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
968         if (ret != 0) {
969                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
970                 return -1;
971         }
972
973         /* verify that we have all the databases any other node has */
974         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
975         if (ret != 0) {
976                 DEBUG(0, (__location__ " Unable to create missing local databases\n"));
977                 return -1;
978         }
979
980
981
982         /* verify that all other nodes have all our databases */
983         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
984         if (ret != 0) {
985                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
986                 return -1;
987         }
988
989
990         DEBUG(1, (__location__ " Recovery - created remote databases\n"));
991
992         /* pull all remote databases onto the local node */
993         ret = pull_all_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
994         if (ret != 0) {
995                 DEBUG(0, (__location__ " Unable to pull remote databases\n"));
996                 return -1;
997         }
998
999         DEBUG(1, (__location__ " Recovery - pulled remote databases\n"));
1000
1001         /* repoint all local database records to the local node as
1002            being dmaster
1003          */
1004         ret = update_dmaster_on_our_databases(ctdb, pnn, dbmap, mem_ctx);
1005         if (ret != 0) {
1006                 DEBUG(0, (__location__ " Unable to update dmaster on all databases\n"));
1007                 return -1;
1008         }
1009
1010         DEBUG(1, (__location__ " Recovery - updated dmaster on all databases\n"));
1011
1012
1013         /* push all local databases to the remote nodes */
1014         ret = push_all_local_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1015         if (ret != 0) {
1016                 DEBUG(0, (__location__ " Unable to push local databases\n"));
1017                 return -1;
1018         }
1019
1020         DEBUG(1, (__location__ " Recovery - pushed remote databases\n"));
1021
1022         /* build a new vnn map with all the currently active and
1023            unbanned nodes */
1024         generation = new_generation();
1025         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1026         CTDB_NO_MEMORY(ctdb, vnnmap);
1027         vnnmap->generation = generation;
1028         vnnmap->size = num_active;
1029         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1030         for (i=j=0;i<nodemap->num;i++) {
1031                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1032                         vnnmap->map[j++] = nodemap->nodes[i].pnn;
1033                 }
1034         }
1035
1036
1037
1038         /* update to the new vnnmap on all nodes */
1039         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1040         if (ret != 0) {
1041                 DEBUG(0, (__location__ " Unable to update vnnmap on all nodes\n"));
1042                 return -1;
1043         }
1044
1045         DEBUG(1, (__location__ " Recovery - updated vnnmap\n"));
1046
1047         /* update recmaster to point to us for all nodes */
1048         ret = set_recovery_master(ctdb, nodemap, pnn);
1049         if (ret!=0) {
1050                 DEBUG(0, (__location__ " Unable to set recovery master\n"));
1051                 return -1;
1052         }
1053
1054         DEBUG(1, (__location__ " Recovery - updated recmaster\n"));
1055
1056         /*
1057           update all nodes to have the same flags that we have
1058          */
1059         ret = update_flags_on_all_nodes(ctdb, nodemap);
1060         if (ret != 0) {
1061                 DEBUG(0, (__location__ " Unable to update flags on all nodes\n"));
1062                 return -1;
1063         }
1064         
1065         DEBUG(1, (__location__ " Recovery - updated flags\n"));
1066
1067         /*
1068           run a vacuum operation on empty records
1069          */
1070         ret = vacuum_all_databases(ctdb, nodemap, dbmap);
1071         if (ret != 0) {
1072                 DEBUG(0, (__location__ " Unable to vacuum all databases\n"));
1073                 return -1;
1074         }
1075
1076         DEBUG(1, (__location__ " Recovery - vacuumed all databases\n"));
1077
1078         /*
1079           if enabled, tell nodes to takeover their public IPs
1080          */
1081         if (ctdb->vnn) {
1082                 rec->need_takeover_run = false;
1083                 ret = ctdb_takeover_run(ctdb, nodemap);
1084                 if (ret != 0) {
1085                         DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
1086                         return -1;
1087                 }
1088                 DEBUG(1, (__location__ " Recovery - done takeover\n"));
1089         }
1090
1091         for (i=0;i<dbmap->num;i++) {
1092                 DEBUG(0,("Recovered database with db_id 0x%08x\n", dbmap->dbs[i].dbid));
1093         }
1094
1095         /* disable recovery mode */
1096         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
1097         if (ret!=0) {
1098                 DEBUG(0, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1099                 return -1;
1100         }
1101
1102         /* send a message to all clients telling them that the cluster 
1103            has been reconfigured */
1104         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1105
1106         DEBUG(0, (__location__ " Recovery complete\n"));
1107
1108         rec->need_recovery = false;
1109
1110         /* We just finished a recovery successfully. 
1111            We now wait for rerecovery_timeout before we allow 
1112            another recovery to take place.
1113         */
1114         DEBUG(0, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1115         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1116         DEBUG(0, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1117
1118         return 0;
1119 }
1120
1121
1122 /*
1123   elections are won by first checking the number of connected nodes, then
1124   the priority time, then the pnn
1125  */
1126 struct election_message {
1127         uint32_t num_connected;
1128         struct timeval priority_time;
1129         uint32_t pnn;
1130         uint32_t node_flags;
1131 };
1132
1133 /*
1134   form this nodes election data
1135  */
1136 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1137 {
1138         int ret, i;
1139         struct ctdb_node_map *nodemap;
1140         struct ctdb_context *ctdb = rec->ctdb;
1141
1142         ZERO_STRUCTP(em);
1143
1144         em->pnn = rec->ctdb->pnn;
1145         em->priority_time = rec->priority_time;
1146         em->node_flags = rec->node_flags;
1147
1148         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1149         if (ret != 0) {
1150                 DEBUG(0,(__location__ " unable to get election data\n"));
1151                 return;
1152         }
1153
1154         for (i=0;i<nodemap->num;i++) {
1155                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1156                         em->num_connected++;
1157                 }
1158         }
1159         talloc_free(nodemap);
1160 }
1161
1162 /*
1163   see if the given election data wins
1164  */
1165 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1166 {
1167         struct election_message myem;
1168         int cmp = 0;
1169
1170         ctdb_election_data(rec, &myem);
1171
1172         /* we cant win if we are banned */
1173         if (rec->node_flags & NODE_FLAGS_BANNED) {
1174                 return false;
1175         }       
1176
1177         /* we will automatically win if the other node is banned */
1178         if (em->node_flags & NODE_FLAGS_BANNED) {
1179                 return true;
1180         }
1181
1182         /* try to use the most connected node */
1183         if (cmp == 0) {
1184                 cmp = (int)myem.num_connected - (int)em->num_connected;
1185         }
1186
1187         /* then the longest running node */
1188         if (cmp == 0) {
1189                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1190         }
1191
1192         if (cmp == 0) {
1193                 cmp = (int)myem.pnn - (int)em->pnn;
1194         }
1195
1196         return cmp > 0;
1197 }
1198
1199 /*
1200   send out an election request
1201  */
1202 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
1203 {
1204         int ret;
1205         TDB_DATA election_data;
1206         struct election_message emsg;
1207         uint64_t srvid;
1208         struct ctdb_context *ctdb = rec->ctdb;
1209
1210         srvid = CTDB_SRVID_RECOVERY;
1211
1212         ctdb_election_data(rec, &emsg);
1213
1214         election_data.dsize = sizeof(struct election_message);
1215         election_data.dptr  = (unsigned char *)&emsg;
1216
1217
1218         /* first we assume we will win the election and set 
1219            recoverymaster to be ourself on the current node
1220          */
1221         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1222         if (ret != 0) {
1223                 DEBUG(0, (__location__ " failed to send recmaster election request\n"));
1224                 return -1;
1225         }
1226
1227
1228         /* send an election message to all active nodes */
1229         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1230
1231         return 0;
1232 }
1233
1234 /*
1235   this function will unban all nodes in the cluster
1236 */
1237 static void unban_all_nodes(struct ctdb_context *ctdb)
1238 {
1239         int ret, i;
1240         struct ctdb_node_map *nodemap;
1241         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1242         
1243         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1244         if (ret != 0) {
1245                 DEBUG(0,(__location__ " failed to get nodemap to unban all nodes\n"));
1246                 return;
1247         }
1248
1249         for (i=0;i<nodemap->num;i++) {
1250                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1251                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1252                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1253                 }
1254         }
1255
1256         talloc_free(tmp_ctx);
1257 }
1258
1259
1260 /*
1261   we think we are winning the election - send a broadcast election request
1262  */
1263 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1264 {
1265         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1266         int ret;
1267
1268         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
1269         if (ret != 0) {
1270                 DEBUG(0,("Failed to send election request!\n"));
1271         }
1272
1273         talloc_free(rec->send_election_te);
1274         rec->send_election_te = NULL;
1275 }
1276
1277 /*
1278   handler for recovery master elections
1279 */
1280 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1281                              TDB_DATA data, void *private_data)
1282 {
1283         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1284         int ret;
1285         struct election_message *em = (struct election_message *)data.dptr;
1286         TALLOC_CTX *mem_ctx;
1287
1288         /* we got an election packet - update the timeout for the election */
1289         talloc_free(rec->election_timeout);
1290         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1291                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1292                                                 ctdb_election_timeout, rec);
1293
1294         mem_ctx = talloc_new(ctdb);
1295
1296         /* someone called an election. check their election data
1297            and if we disagree and we would rather be the elected node, 
1298            send a new election message to all other nodes
1299          */
1300         if (ctdb_election_win(rec, em)) {
1301                 if (!rec->send_election_te) {
1302                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1303                                                                 timeval_current_ofs(0, 500000),
1304                                                                 election_send_request, rec);
1305                 }
1306                 talloc_free(mem_ctx);
1307                 /*unban_all_nodes(ctdb);*/
1308                 return;
1309         }
1310         
1311         /* we didn't win */
1312         talloc_free(rec->send_election_te);
1313         rec->send_election_te = NULL;
1314
1315         /* release the recmaster lock */
1316         if (em->pnn != ctdb->pnn &&
1317             ctdb->recovery_lock_fd != -1) {
1318                 close(ctdb->recovery_lock_fd);
1319                 ctdb->recovery_lock_fd = -1;
1320                 unban_all_nodes(ctdb);
1321         }
1322
1323         /* ok, let that guy become recmaster then */
1324         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1325         if (ret != 0) {
1326                 DEBUG(0, (__location__ " failed to send recmaster election request"));
1327                 talloc_free(mem_ctx);
1328                 return;
1329         }
1330
1331         /* release any bans */
1332         rec->last_culprit = (uint32_t)-1;
1333         talloc_free(rec->banned_nodes);
1334         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1335         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1336
1337         talloc_free(mem_ctx);
1338         return;
1339 }
1340
1341
1342 /*
1343   force the start of the election process
1344  */
1345 static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn, 
1346                            struct ctdb_node_map *nodemap)
1347 {
1348         int ret;
1349         struct ctdb_context *ctdb = rec->ctdb;
1350
1351         /* set all nodes to recovery mode to stop all internode traffic */
1352         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1353         if (ret!=0) {
1354                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
1355                 return;
1356         }
1357
1358         talloc_free(rec->election_timeout);
1359         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1360                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1361                                                 ctdb_election_timeout, rec);
1362
1363         ret = send_election_request(rec, pnn);
1364         if (ret!=0) {
1365                 DEBUG(0, (__location__ " failed to initiate recmaster election"));
1366                 return;
1367         }
1368
1369         /* wait for a few seconds to collect all responses */
1370         ctdb_wait_election(rec);
1371 }
1372
1373
1374
1375 /*
1376   handler for when a node changes its flags
1377 */
1378 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1379                             TDB_DATA data, void *private_data)
1380 {
1381         int ret;
1382         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1383         struct ctdb_node_map *nodemap=NULL;
1384         TALLOC_CTX *tmp_ctx;
1385         uint32_t changed_flags;
1386         int i;
1387         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1388
1389         if (data.dsize != sizeof(*c)) {
1390                 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1391                 return;
1392         }
1393
1394         tmp_ctx = talloc_new(ctdb);
1395         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1396
1397         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1398         if (ret != 0) {
1399                 DEBUG(0,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1400                 talloc_free(tmp_ctx);
1401                 return;         
1402         }
1403
1404
1405         for (i=0;i<nodemap->num;i++) {
1406                 if (nodemap->nodes[i].pnn == c->pnn) break;
1407         }
1408
1409         if (i == nodemap->num) {
1410                 DEBUG(0,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1411                 talloc_free(tmp_ctx);
1412                 return;
1413         }
1414
1415         changed_flags = c->old_flags ^ c->new_flags;
1416
1417         /* Dont let messages from remote nodes change the DISCONNECTED flag. 
1418            This flag is handled locally based on whether the local node
1419            can communicate with the node or not.
1420         */
1421         c->new_flags &= ~NODE_FLAGS_DISCONNECTED;
1422         if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) {
1423                 c->new_flags |= NODE_FLAGS_DISCONNECTED;
1424         }
1425
1426         if (nodemap->nodes[i].flags != c->new_flags) {
1427                 DEBUG(0,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1428         }
1429
1430         nodemap->nodes[i].flags = c->new_flags;
1431
1432         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1433                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1434
1435         if (ret == 0) {
1436                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1437                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1438         }
1439         
1440         if (ret == 0 &&
1441             ctdb->recovery_master == ctdb->pnn &&
1442             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
1443             ctdb->vnn) {
1444                 /* Only do the takeover run if the perm disabled or unhealthy
1445                    flags changed since these will cause an ip failover but not
1446                    a recovery.
1447                    If the node became disconnected or banned this will also
1448                    lead to an ip address failover but that is handled 
1449                    during recovery
1450                 */
1451                 if (changed_flags & NODE_FLAGS_DISABLED) {
1452                         rec->need_takeover_run = true;
1453                 }
1454         }
1455
1456         talloc_free(tmp_ctx);
1457 }
1458
1459
1460
1461 struct verify_recmode_normal_data {
1462         uint32_t count;
1463         enum monitor_result status;
1464 };
1465
1466 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1467 {
1468         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
1469
1470
1471         /* one more node has responded with recmode data*/
1472         rmdata->count--;
1473
1474         /* if we failed to get the recmode, then return an error and let
1475            the main loop try again.
1476         */
1477         if (state->state != CTDB_CONTROL_DONE) {
1478                 if (rmdata->status == MONITOR_OK) {
1479                         rmdata->status = MONITOR_FAILED;
1480                 }
1481                 return;
1482         }
1483
1484         /* if we got a response, then the recmode will be stored in the
1485            status field
1486         */
1487         if (state->status != CTDB_RECOVERY_NORMAL) {
1488                 DEBUG(0, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
1489                 rmdata->status = MONITOR_RECOVERY_NEEDED;
1490         }
1491
1492         return;
1493 }
1494
1495
1496 /* verify that all nodes are in normal recovery mode */
1497 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
1498 {
1499         struct verify_recmode_normal_data *rmdata;
1500         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1501         struct ctdb_client_control_state *state;
1502         enum monitor_result status;
1503         int j;
1504         
1505         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
1506         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1507         rmdata->count  = 0;
1508         rmdata->status = MONITOR_OK;
1509
1510         /* loop over all active nodes and send an async getrecmode call to 
1511            them*/
1512         for (j=0; j<nodemap->num; j++) {
1513                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1514                         continue;
1515                 }
1516                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
1517                                         CONTROL_TIMEOUT(), 
1518                                         nodemap->nodes[j].pnn);
1519                 if (state == NULL) {
1520                         /* we failed to send the control, treat this as 
1521                            an error and try again next iteration
1522                         */                      
1523                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
1524                         talloc_free(mem_ctx);
1525                         return MONITOR_FAILED;
1526                 }
1527
1528                 /* set up the callback functions */
1529                 state->async.fn = verify_recmode_normal_callback;
1530                 state->async.private_data = rmdata;
1531
1532                 /* one more control to wait for to complete */
1533                 rmdata->count++;
1534         }
1535
1536
1537         /* now wait for up to the maximum number of seconds allowed
1538            or until all nodes we expect a response from has replied
1539         */
1540         while (rmdata->count > 0) {
1541                 event_loop_once(ctdb->ev);
1542         }
1543
1544         status = rmdata->status;
1545         talloc_free(mem_ctx);
1546         return status;
1547 }
1548
1549
1550 struct verify_recmaster_data {
1551         uint32_t count;
1552         uint32_t pnn;
1553         enum monitor_result status;
1554 };
1555
1556 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
1557 {
1558         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
1559
1560
1561         /* one more node has responded with recmaster data*/
1562         rmdata->count--;
1563
1564         /* if we failed to get the recmaster, then return an error and let
1565            the main loop try again.
1566         */
1567         if (state->state != CTDB_CONTROL_DONE) {
1568                 if (rmdata->status == MONITOR_OK) {
1569                         rmdata->status = MONITOR_FAILED;
1570                 }
1571                 return;
1572         }
1573
1574         /* if we got a response, then the recmaster will be stored in the
1575            status field
1576         */
1577         if (state->status != rmdata->pnn) {
1578                 DEBUG(0,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
1579                 rmdata->status = MONITOR_ELECTION_NEEDED;
1580         }
1581
1582         return;
1583 }
1584
1585
1586 /* verify that all nodes agree that we are the recmaster */
1587 static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
1588 {
1589         struct verify_recmaster_data *rmdata;
1590         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1591         struct ctdb_client_control_state *state;
1592         enum monitor_result status;
1593         int j;
1594         
1595         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
1596         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1597         rmdata->count  = 0;
1598         rmdata->pnn    = pnn;
1599         rmdata->status = MONITOR_OK;
1600
1601         /* loop over all active nodes and send an async getrecmaster call to 
1602            them*/
1603         for (j=0; j<nodemap->num; j++) {
1604                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1605                         continue;
1606                 }
1607                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
1608                                         CONTROL_TIMEOUT(),
1609                                         nodemap->nodes[j].pnn);
1610                 if (state == NULL) {
1611                         /* we failed to send the control, treat this as 
1612                            an error and try again next iteration
1613                         */                      
1614                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
1615                         talloc_free(mem_ctx);
1616                         return MONITOR_FAILED;
1617                 }
1618
1619                 /* set up the callback functions */
1620                 state->async.fn = verify_recmaster_callback;
1621                 state->async.private_data = rmdata;
1622
1623                 /* one more control to wait for to complete */
1624                 rmdata->count++;
1625         }
1626
1627
1628         /* now wait for up to the maximum number of seconds allowed
1629            or until all nodes we expect a response from has replied
1630         */
1631         while (rmdata->count > 0) {
1632                 event_loop_once(ctdb->ev);
1633         }
1634
1635         status = rmdata->status;
1636         talloc_free(mem_ctx);
1637         return status;
1638 }
1639
1640
1641 /*
1642   the main monitoring loop
1643  */
1644 static void monitor_cluster(struct ctdb_context *ctdb)
1645 {
1646         uint32_t pnn, num_active, recmaster;
1647         TALLOC_CTX *mem_ctx=NULL;
1648         struct ctdb_node_map *nodemap=NULL;
1649         struct ctdb_node_map *remote_nodemap=NULL;
1650         struct ctdb_vnn_map *vnnmap=NULL;
1651         struct ctdb_vnn_map *remote_vnnmap=NULL;
1652         int i, j, ret;
1653         struct ctdb_recoverd *rec;
1654         struct ctdb_all_public_ips *ips;
1655         char c;
1656
1657         DEBUG(0,("monitor_cluster starting\n"));
1658
1659         rec = talloc_zero(ctdb, struct ctdb_recoverd);
1660         CTDB_NO_MEMORY_FATAL(ctdb, rec);
1661
1662         rec->ctdb = ctdb;
1663         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1664         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1665
1666         rec->priority_time = timeval_current();
1667
1668         /* register a message port for recovery elections */
1669         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
1670
1671         /* and one for when nodes are disabled/enabled */
1672         ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
1673
1674         /* and one for when nodes are banned */
1675         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
1676
1677         /* and one for when nodes are unbanned */
1678         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
1679         
1680 again:
1681         if (mem_ctx) {
1682                 talloc_free(mem_ctx);
1683                 mem_ctx = NULL;
1684         }
1685         mem_ctx = talloc_new(ctdb);
1686         if (!mem_ctx) {
1687                 DEBUG(0,("Failed to create temporary context\n"));
1688                 exit(-1);
1689         }
1690
1691         /* we only check for recovery once every second */
1692         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
1693
1694         if (rec->election_timeout) {
1695                 /* an election is in progress */
1696                 goto again;
1697         }
1698
1699
1700         /* We must check if we need to ban a node here but we want to do this
1701            as early as possible so we dont wait until we have pulled the node
1702            map from the local node. thats why we have the hardcoded value 20
1703         */
1704         if (rec->culprit_counter > 20) {
1705                 DEBUG(0,("Node %u has caused %u failures in %.0f seconds - banning it for %u seconds\n",
1706                          rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
1707                          ctdb->tunable.recovery_ban_period));
1708                 ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
1709         }
1710
1711         /* get relevant tunables */
1712         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
1713         if (ret != 0) {
1714                 DEBUG(0,("Failed to get tunables - retrying\n"));
1715                 goto again;
1716         }
1717
1718         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1719         if (pnn == (uint32_t)-1) {
1720                 DEBUG(0,("Failed to get local pnn - retrying\n"));
1721                 goto again;
1722         }
1723
1724         /* get the vnnmap */
1725         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
1726         if (ret != 0) {
1727                 DEBUG(0, (__location__ " Unable to get vnnmap from node %u\n", pnn));
1728                 goto again;
1729         }
1730
1731
1732         /* get number of nodes */
1733         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &nodemap);
1734         if (ret != 0) {
1735                 DEBUG(0, (__location__ " Unable to get nodemap from node %u\n", pnn));
1736                 goto again;
1737         }
1738
1739         /* check which node is the recovery master */
1740         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &recmaster);
1741         if (ret != 0) {
1742                 DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", pnn));
1743                 goto again;
1744         }
1745
1746         if (recmaster == (uint32_t)-1) {
1747                 DEBUG(0,(__location__ " Initial recovery master set - forcing election\n"));
1748                 force_election(rec, mem_ctx, pnn, nodemap);
1749                 goto again;
1750         }
1751         
1752         /* check that we (recovery daemon) and the local ctdb daemon
1753            agrees on whether we are banned or not
1754         */
1755         if (nodemap->nodes[pnn].flags & NODE_FLAGS_BANNED) {
1756                 if (rec->banned_nodes[pnn] == NULL) {
1757                         if (recmaster == pnn) {
1758                                 DEBUG(0,("Local ctdb daemon on recmaster thinks this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
1759
1760                                 ctdb_unban_node(rec, pnn);
1761                         } else {
1762                                 DEBUG(0,("Local ctdb daemon on non-recmaster thinks this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
1763                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1764                                 ctdb_set_culprit(rec, pnn);
1765                         }
1766                         goto again;
1767                 }
1768         } else {
1769                 if (rec->banned_nodes[pnn] != NULL) {
1770                         if (recmaster == pnn) {
1771                                 DEBUG(0,("Local ctdb daemon on recmaster does not think this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
1772
1773                                 ctdb_unban_node(rec, pnn);
1774                         } else {
1775                                 DEBUG(0,("Local ctdb daemon on non-recmaster does not think this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
1776
1777                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1778                                 ctdb_set_culprit(rec, pnn);
1779                         }
1780                         goto again;
1781                 }
1782         }
1783
1784         /* remember our own node flags */
1785         rec->node_flags = nodemap->nodes[pnn].flags;
1786
1787         /* count how many active nodes there are */
1788         num_active = 0;
1789         for (i=0; i<nodemap->num; i++) {
1790                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1791                         num_active++;
1792                 }
1793         }
1794
1795
1796         /* verify that the recmaster node is still active */
1797         for (j=0; j<nodemap->num; j++) {
1798                 if (nodemap->nodes[j].pnn==recmaster) {
1799                         break;
1800                 }
1801         }
1802
1803         if (j == nodemap->num) {
1804                 DEBUG(0, ("Recmaster node %u not in list. Force reelection\n", recmaster));
1805                 force_election(rec, mem_ctx, pnn, nodemap);
1806                 goto again;
1807         }
1808
1809         /* if recovery master is disconnected we must elect a new recmaster */
1810         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1811                 DEBUG(0, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
1812                 force_election(rec, mem_ctx, pnn, nodemap);
1813                 goto again;
1814         }
1815
1816         /* grap the nodemap from the recovery master to check if it is banned */
1817         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1818                                    mem_ctx, &remote_nodemap);
1819         if (ret != 0) {
1820                 DEBUG(0, (__location__ " Unable to get nodemap from recovery master %u\n", 
1821                           nodemap->nodes[j].pnn));
1822                 goto again;
1823         }
1824
1825
1826         if (remote_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1827                 DEBUG(0, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
1828                 force_election(rec, mem_ctx, pnn, nodemap);
1829                 goto again;
1830         }
1831
1832         /* verify that the public ip address allocation is consistent */
1833         if (ctdb->vnn != NULL) {
1834                 ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
1835                 if (ret != 0) {
1836                         DEBUG(0, ("Unable to get public ips from node %u\n", i));
1837                         goto again;
1838                 }
1839                 for (j=0; j<ips->num; j++) {
1840                         /* verify that we have the ip addresses we should have
1841                            and we dont have ones we shouldnt have.
1842                            if we find an inconsistency we set recmode to
1843                            active on the local node and wait for the recmaster
1844                            to do a full blown recovery
1845                         */
1846                         if (ips->ips[j].pnn == pnn) {
1847                                 if (!ctdb_sys_have_ip(ips->ips[j].sin)) {
1848                                         DEBUG(0,("Public address '%s' is missing and we should serve this ip\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
1849                                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1850                                         if (ret != 0) {
1851                                                 DEBUG(0,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
1852                                                 goto again;
1853                                         }
1854                                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
1855                                         if (ret != 0) {
1856                                                 DEBUG(0,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
1857                                                 goto again;
1858                                         }
1859                                 }
1860                         } else {
1861                                 if (ctdb_sys_have_ip(ips->ips[j].sin)) {
1862                                         DEBUG(0,("We are still serving a public address '%s' that we should not be serving.\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
1863                                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1864                                         if (ret != 0) {
1865                                                 DEBUG(0,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
1866                                                 goto again;
1867                                         }
1868                                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
1869                                         if (ret != 0) {
1870                                                 DEBUG(0,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
1871                                                 goto again;
1872                                         }
1873                                 }
1874                         }
1875                 }
1876         }
1877
1878         /* if we are not the recmaster then we do not need to check
1879            if recovery is needed
1880          */
1881         if (pnn != recmaster) {
1882                 goto again;
1883         }
1884
1885
1886         /* ensure our local copies of flags are right */
1887         ret = update_local_flags(rec, nodemap);
1888         if (ret == MONITOR_ELECTION_NEEDED) {
1889                 DEBUG(0,("update_local_flags() called for a re-election.\n"));
1890                 force_election(rec, mem_ctx, pnn, nodemap);
1891                 goto again;
1892         }
1893         if (ret != MONITOR_OK) {
1894                 DEBUG(0,("Unable to update local flags\n"));
1895                 goto again;
1896         }
1897
1898         /* update the list of public ips that a node can handle for
1899            all connected nodes
1900         */
1901         for (j=0; j<nodemap->num; j++) {
1902                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1903                         continue;
1904                 }
1905                 /* release any existing data */
1906                 if (ctdb->nodes[j]->public_ips) {
1907                         talloc_free(ctdb->nodes[j]->public_ips);
1908                         ctdb->nodes[j]->public_ips = NULL;
1909                 }
1910                 /* grab a new shiny list of public ips from the node */
1911                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
1912                         ctdb->nodes[j]->pnn, 
1913                         ctdb->nodes,
1914                         &ctdb->nodes[j]->public_ips)) {
1915                         DEBUG(0,("Failed to read public ips from node : %u\n", 
1916                                 ctdb->nodes[j]->pnn));
1917                         goto again;
1918                 }
1919         }
1920
1921
1922         /* verify that all active nodes agree that we are the recmaster */
1923         switch (verify_recmaster(ctdb, nodemap, pnn)) {
1924         case MONITOR_RECOVERY_NEEDED:
1925                 /* can not happen */
1926                 goto again;
1927         case MONITOR_ELECTION_NEEDED:
1928                 force_election(rec, mem_ctx, pnn, nodemap);
1929                 goto again;
1930         case MONITOR_OK:
1931                 break;
1932         case MONITOR_FAILED:
1933                 goto again;
1934         }
1935
1936
1937         if (rec->need_recovery) {
1938                 /* a previous recovery didn't finish */
1939                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
1940                 goto again;             
1941         }
1942
1943         /* verify that all active nodes are in normal mode 
1944            and not in recovery mode 
1945          */
1946         switch (verify_recmode(ctdb, nodemap)) {
1947         case MONITOR_RECOVERY_NEEDED:
1948                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
1949                 goto again;
1950         case MONITOR_FAILED:
1951                 goto again;
1952         case MONITOR_ELECTION_NEEDED:
1953                 /* can not happen */
1954         case MONITOR_OK:
1955                 break;
1956         }
1957
1958
1959         /* we should have the reclock - check its not stale */
1960         if (ctdb->recovery_lock_fd == -1) {
1961                 DEBUG(0,("recovery master doesn't have the recovery lock\n"));
1962                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
1963                 goto again;
1964         }
1965
1966         if (read(ctdb->recovery_lock_fd, &c, 1) == -1) {
1967                 DEBUG(0,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
1968                 close(ctdb->recovery_lock_fd);
1969                 ctdb->recovery_lock_fd = -1;
1970                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
1971                 goto again;
1972         }
1973
1974         /* get the nodemap for all active remote nodes and verify
1975            they are the same as for this node
1976          */
1977         for (j=0; j<nodemap->num; j++) {
1978                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1979                         continue;
1980                 }
1981                 if (nodemap->nodes[j].pnn == pnn) {
1982                         continue;
1983                 }
1984
1985                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1986                                            mem_ctx, &remote_nodemap);
1987                 if (ret != 0) {
1988                         DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
1989                                   nodemap->nodes[j].pnn));
1990                         goto again;
1991                 }
1992
1993                 /* if the nodes disagree on how many nodes there are
1994                    then this is a good reason to try recovery
1995                  */
1996                 if (remote_nodemap->num != nodemap->num) {
1997                         DEBUG(0, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
1998                                   nodemap->nodes[j].pnn, remote_nodemap->num, nodemap->num));
1999                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
2000                         goto again;
2001                 }
2002
2003                 /* if the nodes disagree on which nodes exist and are
2004                    active, then that is also a good reason to do recovery
2005                  */
2006                 for (i=0;i<nodemap->num;i++) {
2007                         if (remote_nodemap->nodes[i].pnn != nodemap->nodes[i].pnn) {
2008                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2009                                           nodemap->nodes[j].pnn, i, 
2010                                           remote_nodemap->nodes[i].pnn, nodemap->nodes[i].pnn));
2011                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
2012                                             vnnmap, nodemap->nodes[j].pnn);
2013                                 goto again;
2014                         }
2015                         if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
2016                             (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2017                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
2018                                           nodemap->nodes[j].pnn, i,
2019                                           remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
2020                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
2021                                             vnnmap, nodemap->nodes[j].pnn);
2022                                 goto again;
2023                         }
2024                 }
2025
2026         }
2027
2028
2029         /* there better be the same number of lmasters in the vnn map
2030            as there are active nodes or we will have to do a recovery
2031          */
2032         if (vnnmap->size != num_active) {
2033                 DEBUG(0, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
2034                           vnnmap->size, num_active));
2035                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
2036                 goto again;
2037         }
2038
2039         /* verify that all active nodes in the nodemap also exist in 
2040            the vnnmap.
2041          */
2042         for (j=0; j<nodemap->num; j++) {
2043                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2044                         continue;
2045                 }
2046                 if (nodemap->nodes[j].pnn == pnn) {
2047                         continue;
2048                 }
2049
2050                 for (i=0; i<vnnmap->size; i++) {
2051                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2052                                 break;
2053                         }
2054                 }
2055                 if (i == vnnmap->size) {
2056                         DEBUG(0, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
2057                                   nodemap->nodes[j].pnn));
2058                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
2059                         goto again;
2060                 }
2061         }
2062
2063         
2064         /* verify that all other nodes have the same vnnmap
2065            and are from the same generation
2066          */
2067         for (j=0; j<nodemap->num; j++) {
2068                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2069                         continue;
2070                 }
2071                 if (nodemap->nodes[j].pnn == pnn) {
2072                         continue;
2073                 }
2074
2075                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2076                                           mem_ctx, &remote_vnnmap);
2077                 if (ret != 0) {
2078                         DEBUG(0, (__location__ " Unable to get vnnmap from remote node %u\n", 
2079                                   nodemap->nodes[j].pnn));
2080                         goto again;
2081                 }
2082
2083                 /* verify the vnnmap generation is the same */
2084                 if (vnnmap->generation != remote_vnnmap->generation) {
2085                         DEBUG(0, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
2086                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
2087                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
2088                         goto again;
2089                 }
2090
2091                 /* verify the vnnmap size is the same */
2092                 if (vnnmap->size != remote_vnnmap->size) {
2093                         DEBUG(0, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
2094                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
2095                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
2096                         goto again;
2097                 }
2098
2099                 /* verify the vnnmap is the same */
2100                 for (i=0;i<vnnmap->size;i++) {
2101                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
2102                                 DEBUG(0, (__location__ " Remote node %u has different vnnmap.\n", 
2103                                           nodemap->nodes[j].pnn));
2104                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
2105                                             vnnmap, nodemap->nodes[j].pnn);
2106                                 goto again;
2107                         }
2108                 }
2109         }
2110
2111         /* we might need to change who has what IP assigned */
2112         if (rec->need_takeover_run) {
2113                 rec->need_takeover_run = false;
2114                 ret = ctdb_takeover_run(ctdb, nodemap);
2115                 if (ret != 0) {
2116                         DEBUG(0, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
2117                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
2118                                     vnnmap, ctdb->pnn);
2119                 }
2120         }
2121
2122         goto again;
2123
2124 }
2125
2126 /*
2127   event handler for when the main ctdbd dies
2128  */
2129 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
2130                                  uint16_t flags, void *private_data)
2131 {
2132         DEBUG(0,("recovery daemon parent died - exiting\n"));
2133         _exit(1);
2134 }
2135
2136 /*
2137   startup the recovery daemon as a child of the main ctdb daemon
2138  */
2139 int ctdb_start_recoverd(struct ctdb_context *ctdb)
2140 {
2141         int ret;
2142         int fd[2];
2143
2144         if (pipe(fd) != 0) {
2145                 return -1;
2146         }
2147
2148         ctdb->recoverd_pid = fork();
2149         if (ctdb->recoverd_pid == -1) {
2150                 return -1;
2151         }
2152         
2153         if (ctdb->recoverd_pid != 0) {
2154                 close(fd[0]);
2155                 return 0;
2156         }
2157
2158         close(fd[1]);
2159
2160         /* shutdown the transport */
2161         ctdb->methods->shutdown(ctdb);
2162
2163         /* get a new event context */
2164         talloc_free(ctdb->ev);
2165         ctdb->ev = event_context_init(ctdb);
2166
2167         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
2168                      ctdb_recoverd_parent, &fd[0]);     
2169
2170         close(ctdb->daemon.sd);
2171         ctdb->daemon.sd = -1;
2172
2173         srandom(getpid() ^ time(NULL));
2174
2175         /* initialise ctdb */
2176         ret = ctdb_socket_connect(ctdb);
2177         if (ret != 0) {
2178                 DEBUG(0, (__location__ " Failed to init ctdb\n"));
2179                 exit(1);
2180         }
2181
2182         monitor_cluster(ctdb);
2183
2184         DEBUG(0,("ERROR: ctdb_recoverd finished!?\n"));
2185         return -1;
2186 }
2187
2188 /*
2189   shutdown the recovery daemon
2190  */
2191 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
2192 {
2193         if (ctdb->recoverd_pid == 0) {
2194                 return;
2195         }
2196
2197         DEBUG(0,("Shutting down recovery daemon\n"));
2198         kill(ctdb->recoverd_pid, SIGTERM);
2199 }