sync flags between nodes in monitor loop in recmaster
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb.h"
28 #include "../include/ctdb_private.h"
29
30
31 struct ban_state {
32         struct ctdb_recoverd *rec;
33         uint32_t banned_node;
34 };
35
36 /*
37   private state of recovery daemon
38  */
39 struct ctdb_recoverd {
40         struct ctdb_context *ctdb;
41         uint32_t last_culprit;
42         uint32_t culprit_counter;
43         struct timeval first_recover_time;
44         struct ban_state **banned_nodes;
45         struct timeval priority_time;
46         bool need_takeover_run;
47         bool need_recovery;
48         uint32_t node_flags;
49 };
50
51 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
52 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
53
54 /*
55   unban a node
56  */
57 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
58 {
59         struct ctdb_context *ctdb = rec->ctdb;
60
61         if (!ctdb_validate_pnn(ctdb, pnn)) {
62                 DEBUG(0,("Bad pnn %u in ctdb_unban_node\n", pnn));
63                 return;
64         }
65
66         if (rec->banned_nodes[pnn] == NULL) {
67                 return;
68         }
69
70         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
71
72         talloc_free(rec->banned_nodes[pnn]);
73         rec->banned_nodes[pnn] = NULL;
74 }
75
76
77 /*
78   called when a ban has timed out
79  */
80 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
81 {
82         struct ban_state *state = talloc_get_type(p, struct ban_state);
83         struct ctdb_recoverd *rec = state->rec;
84         uint32_t pnn = state->banned_node;
85
86         DEBUG(0,("Node %u is now unbanned\n", pnn));
87         ctdb_unban_node(rec, pnn);
88 }
89
90 /*
91   ban a node for a period of time
92  */
93 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
94 {
95         struct ctdb_context *ctdb = rec->ctdb;
96
97         if (!ctdb_validate_pnn(ctdb, pnn)) {
98                 DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
99                 return;
100         }
101
102         if (0 == ctdb->tunable.enable_bans) {
103                 DEBUG(0,("Bans are disabled - ignoring ban of node %u\n", pnn));
104                 return;
105         }
106
107         if (pnn == ctdb->pnn) {
108                 DEBUG(0,("self ban - lowering our election priority\n"));
109                 /* banning ourselves - lower our election priority */
110                 rec->priority_time = timeval_current();
111         }
112
113         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
114
115         rec->banned_nodes[pnn] = talloc(rec, struct ban_state);
116         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
117
118         rec->banned_nodes[pnn]->rec = rec;
119         rec->banned_nodes[pnn]->banned_node = pnn;
120
121         if (ban_time != 0) {
122                 event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
123                                 timeval_current_ofs(ban_time, 0),
124                                 ctdb_ban_timeout, rec->banned_nodes[pnn]);
125         }
126 }
127
128 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
129
130
131 struct freeze_node_data {
132         uint32_t count;
133         enum monitor_result status;
134 };
135
136
137 static void freeze_node_callback(struct ctdb_client_control_state *state)
138 {
139         struct freeze_node_data *fndata = talloc_get_type(state->async.private_data, struct freeze_node_data);
140
141
142         /* one more node has responded to our freeze node*/
143         fndata->count--;
144
145         /* if we failed to freeze the node, we must trigger another recovery */
146         if ( (state->state != CTDB_CONTROL_DONE) || (state->status != 0) ) {
147                 DEBUG(0, (__location__ " Failed to freeze node:%u. recovery failed\n", state->c->hdr.destnode));
148                 fndata->status = MONITOR_RECOVERY_NEEDED;
149         }
150
151         return;
152 }
153
154
155
156 /* freeze all nodes */
157 static enum monitor_result freeze_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
158 {
159         struct freeze_node_data *fndata;
160         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
161         struct ctdb_client_control_state *state;
162         enum monitor_result status;
163         int j;
164         
165         fndata = talloc(mem_ctx, struct freeze_node_data);
166         CTDB_NO_MEMORY_FATAL(ctdb, fndata);
167         fndata->count  = 0;
168         fndata->status = MONITOR_OK;
169
170         /* loop over all active nodes and send an async freeze call to 
171            them*/
172         for (j=0; j<nodemap->num; j++) {
173                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
174                         continue;
175                 }
176                 state = ctdb_ctrl_freeze_send(ctdb, mem_ctx, 
177                                         CONTROL_TIMEOUT(), 
178                                         nodemap->nodes[j].pnn);
179                 if (state == NULL) {
180                         /* we failed to send the control, treat this as 
181                            an error and try again next iteration
182                         */                      
183                         DEBUG(0,("Failed to call ctdb_ctrl_freeze_send during recovery\n"));
184                         talloc_free(mem_ctx);
185                         return MONITOR_RECOVERY_NEEDED;
186                 }
187
188                 /* set up the callback functions */
189                 state->async.fn = freeze_node_callback;
190                 state->async.private_data = fndata;
191
192                 /* one more control to wait for to complete */
193                 fndata->count++;
194         }
195
196
197         /* now wait for up to the maximum number of seconds allowed
198            or until all nodes we expect a response from has replied
199         */
200         while (fndata->count > 0) {
201                 event_loop_once(ctdb->ev);
202         }
203
204         status = fndata->status;
205         talloc_free(mem_ctx);
206         return status;
207 }
208
209
210 /*
211   change recovery mode on all nodes
212  */
213 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
214 {
215         int j, ret;
216
217         /* freeze all nodes */
218         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
219                 ret = freeze_all_nodes(ctdb, nodemap);
220                 if (ret != MONITOR_OK) {
221                         DEBUG(0, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
222                         return -1;
223                 }
224         }
225
226
227         /* set recovery mode to active on all nodes */
228         for (j=0; j<nodemap->num; j++) {
229                 /* dont change it for nodes that are unavailable */
230                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
231                         continue;
232                 }
233
234                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, rec_mode);
235                 if (ret != 0) {
236                         DEBUG(0, (__location__ " Unable to set recmode on node %u\n", nodemap->nodes[j].pnn));
237                         return -1;
238                 }
239
240                 if (rec_mode == CTDB_RECOVERY_NORMAL) {
241                         ret = ctdb_ctrl_thaw(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn);
242                         if (ret != 0) {
243                                 DEBUG(0, (__location__ " Unable to thaw node %u\n", nodemap->nodes[j].pnn));
244                                 return -1;
245                         }
246                 }
247         }
248
249         return 0;
250 }
251
252 /*
253   change recovery master on all node
254  */
255 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
256 {
257         int j, ret;
258
259         /* set recovery master to pnn on all nodes */
260         for (j=0; j<nodemap->num; j++) {
261                 /* dont change it for nodes that are unavailable */
262                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
263                         continue;
264                 }
265
266                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, pnn);
267                 if (ret != 0) {
268                         DEBUG(0, (__location__ " Unable to set recmaster on node %u\n", nodemap->nodes[j].pnn));
269                         return -1;
270                 }
271         }
272
273         return 0;
274 }
275
276
277 /*
278   ensure all other nodes have attached to any databases that we have
279  */
280 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
281                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
282 {
283         int i, j, db, ret;
284         struct ctdb_dbid_map *remote_dbmap;
285
286         /* verify that all other nodes have all our databases */
287         for (j=0; j<nodemap->num; j++) {
288                 /* we dont need to ourself ourselves */
289                 if (nodemap->nodes[j].pnn == pnn) {
290                         continue;
291                 }
292                 /* dont check nodes that are unavailable */
293                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
294                         continue;
295                 }
296
297                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
298                                          mem_ctx, &remote_dbmap);
299                 if (ret != 0) {
300                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
301                         return -1;
302                 }
303
304                 /* step through all local databases */
305                 for (db=0; db<dbmap->num;db++) {
306                         const char *name;
307
308
309                         for (i=0;i<remote_dbmap->num;i++) {
310                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
311                                         break;
312                                 }
313                         }
314                         /* the remote node already have this database */
315                         if (i!=remote_dbmap->num) {
316                                 continue;
317                         }
318                         /* ok so we need to create this database */
319                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
320                                             mem_ctx, &name);
321                         if (ret != 0) {
322                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", pnn));
323                                 return -1;
324                         }
325                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
326                                            mem_ctx, name, dbmap->dbs[db].persistent);
327                         if (ret != 0) {
328                                 DEBUG(0, (__location__ " Unable to create remote db:%s\n", name));
329                                 return -1;
330                         }
331                 }
332         }
333
334         return 0;
335 }
336
337
338 /*
339   ensure we are attached to any databases that anyone else is attached to
340  */
341 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
342                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
343 {
344         int i, j, db, ret;
345         struct ctdb_dbid_map *remote_dbmap;
346
347         /* verify that we have all database any other node has */
348         for (j=0; j<nodemap->num; j++) {
349                 /* we dont need to ourself ourselves */
350                 if (nodemap->nodes[j].pnn == pnn) {
351                         continue;
352                 }
353                 /* dont check nodes that are unavailable */
354                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
355                         continue;
356                 }
357
358                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
359                                          mem_ctx, &remote_dbmap);
360                 if (ret != 0) {
361                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
362                         return -1;
363                 }
364
365                 /* step through all databases on the remote node */
366                 for (db=0; db<remote_dbmap->num;db++) {
367                         const char *name;
368
369                         for (i=0;i<(*dbmap)->num;i++) {
370                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
371                                         break;
372                                 }
373                         }
374                         /* we already have this db locally */
375                         if (i!=(*dbmap)->num) {
376                                 continue;
377                         }
378                         /* ok so we need to create this database and
379                            rebuild dbmap
380                          */
381                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
382                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
383                         if (ret != 0) {
384                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", 
385                                           nodemap->nodes[j].pnn));
386                                 return -1;
387                         }
388                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
389                                            remote_dbmap->dbs[db].persistent);
390                         if (ret != 0) {
391                                 DEBUG(0, (__location__ " Unable to create local db:%s\n", name));
392                                 return -1;
393                         }
394                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
395                         if (ret != 0) {
396                                 DEBUG(0, (__location__ " Unable to reread dbmap on node %u\n", pnn));
397                                 return -1;
398                         }
399                 }
400         }
401
402         return 0;
403 }
404
405
406 /*
407   pull all the remote database contents into ours
408  */
409 static int pull_all_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
410                                      uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
411 {
412         int i, j, ret;
413
414         /* pull all records from all other nodes across onto this node
415            (this merges based on rsn)
416         */
417         for (i=0;i<dbmap->num;i++) {
418                 for (j=0; j<nodemap->num; j++) {
419                         /* we dont need to merge with ourselves */
420                         if (nodemap->nodes[j].pnn == pnn) {
421                                 continue;
422                         }
423                         /* dont merge from nodes that are unavailable */
424                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
425                                 continue;
426                         }
427                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
428                                                pnn, dbmap->dbs[i].dbid, CTDB_LMASTER_ANY, mem_ctx);
429                         if (ret != 0) {
430                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
431                                           nodemap->nodes[j].pnn, pnn));
432                                 return -1;
433                         }
434                 }
435         }
436
437         return 0;
438 }
439
440
441 /*
442   change the dmaster on all databases to point to us
443  */
444 static int update_dmaster_on_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
445                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
446 {
447         int i, j, ret;
448
449         /* update dmaster to point to this node for all databases/nodes */
450         for (i=0;i<dbmap->num;i++) {
451                 for (j=0; j<nodemap->num; j++) {
452                         /* dont repoint nodes that are unavailable */
453                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
454                                 continue;
455                         }
456                         ret = ctdb_ctrl_setdmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
457                                                    ctdb, dbmap->dbs[i].dbid, pnn);
458                         if (ret != 0) {
459                                 DEBUG(0, (__location__ " Unable to set dmaster for node %u db:0x%08x\n", 
460                                           nodemap->nodes[j].pnn, dbmap->dbs[i].dbid));
461                                 return -1;
462                         }
463                 }
464         }
465
466         return 0;
467 }
468
469
470 /*
471   update flags on all active nodes
472  */
473 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
474 {
475         int i;
476         for (i=0;i<nodemap->num;i++) {
477                 struct ctdb_node_flag_change c;
478                 TDB_DATA data;
479
480                 c.pnn = nodemap->nodes[i].pnn;
481                 c.old_flags = nodemap->nodes[i].flags;
482                 c.new_flags = nodemap->nodes[i].flags;
483
484                 data.dptr = (uint8_t *)&c;
485                 data.dsize = sizeof(c);
486
487                 ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
488                                   CTDB_SRVID_NODE_FLAGS_CHANGED, data);
489
490         }
491         return 0;
492 }
493
494 /*
495   vacuum one database
496  */
497 static int vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *nodemap)
498 {
499         uint64_t max_rsn;
500         int ret, i;
501
502         /* find max rsn on our local node for this db */
503         ret = ctdb_ctrl_get_max_rsn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, db_id, &max_rsn);
504         if (ret != 0) {
505                 return -1;
506         }
507
508         /* set rsn on non-empty records to max_rsn+1 */
509         for (i=0;i<nodemap->num;i++) {
510                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
511                         continue;
512                 }
513                 ret = ctdb_ctrl_set_rsn_nonempty(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
514                                                  db_id, max_rsn+1);
515                 if (ret != 0) {
516                         DEBUG(0,(__location__ " Failed to set rsn on node %u to %llu\n",
517                                  nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
518                         return -1;
519                 }
520         }
521
522         /* delete records with rsn < max_rsn+1 on all nodes */
523         for (i=0;i<nodemap->num;i++) {
524                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
525                         continue;
526                 }
527                 ret = ctdb_ctrl_delete_low_rsn(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
528                                                  db_id, max_rsn+1);
529                 if (ret != 0) {
530                         DEBUG(0,(__location__ " Failed to delete records on node %u with rsn below %llu\n",
531                                  nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
532                         return -1;
533                 }
534         }
535
536
537         return 0;
538 }
539
540
541 /*
542   vacuum all attached databases
543  */
544 static int vacuum_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
545                                 struct ctdb_dbid_map *dbmap)
546 {
547         int i;
548
549         /* update dmaster to point to this node for all databases/nodes */
550         for (i=0;i<dbmap->num;i++) {
551                 if (vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap) != 0) {
552                         return -1;
553                 }
554         }
555         return 0;
556 }
557
558
559 /*
560   push out all our database contents to all other nodes
561  */
562 static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
563                                     uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
564 {
565         int i, j, ret;
566
567         /* push all records out to the nodes again */
568         for (i=0;i<dbmap->num;i++) {
569                 for (j=0; j<nodemap->num; j++) {
570                         /* we dont need to push to ourselves */
571                         if (nodemap->nodes[j].pnn == pnn) {
572                                 continue;
573                         }
574                         /* dont push to nodes that are unavailable */
575                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
576                                 continue;
577                         }
578                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), pnn, nodemap->nodes[j].pnn, 
579                                                dbmap->dbs[i].dbid, CTDB_LMASTER_ANY, mem_ctx);
580                         if (ret != 0) {
581                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
582                                           pnn, nodemap->nodes[j].pnn));
583                                 return -1;
584                         }
585                 }
586         }
587
588         return 0;
589 }
590
591
592 /*
593   ensure all nodes have the same vnnmap we do
594  */
595 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
596                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
597 {
598         int j, ret;
599
600         /* push the new vnn map out to all the nodes */
601         for (j=0; j<nodemap->num; j++) {
602                 /* dont push to nodes that are unavailable */
603                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
604                         continue;
605                 }
606
607                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
608                 if (ret != 0) {
609                         DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
610                         return -1;
611                 }
612         }
613
614         return 0;
615 }
616
617
618 /*
619   handler for when the admin bans a node
620 */
621 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
622                         TDB_DATA data, void *private_data)
623 {
624         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
625         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
626         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
627         uint32_t recmaster;
628         int ret;
629
630         if (data.dsize != sizeof(*b)) {
631                 DEBUG(0,("Bad data in ban_handler\n"));
632                 talloc_free(mem_ctx);
633                 return;
634         }
635
636         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
637         if (ret != 0) {
638                 DEBUG(0,(__location__ " Failed to find the recmaster\n"));
639                 talloc_free(mem_ctx);
640                 return;
641         }
642
643         if (recmaster != ctdb->pnn) {
644                 DEBUG(0,("We are not the recmaster - ignoring ban request\n"));
645                 talloc_free(mem_ctx);
646                 return;
647         }
648
649         DEBUG(0,("Node %u has been banned for %u seconds by the administrator\n", 
650                  b->pnn, b->ban_time));
651         ctdb_ban_node(rec, b->pnn, b->ban_time);
652         talloc_free(mem_ctx);
653 }
654
655 /*
656   handler for when the admin unbans a node
657 */
658 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
659                           TDB_DATA data, void *private_data)
660 {
661         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
662         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
663         uint32_t pnn;
664         int ret;
665         uint32_t recmaster;
666
667         if (data.dsize != sizeof(uint32_t)) {
668                 DEBUG(0,("Bad data in unban_handler\n"));
669                 talloc_free(mem_ctx);
670                 return;
671         }
672         pnn = *(uint32_t *)data.dptr;
673
674         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
675         if (ret != 0) {
676                 DEBUG(0,(__location__ " Failed to find the recmaster\n"));
677                 talloc_free(mem_ctx);
678                 return;
679         }
680
681         if (recmaster != ctdb->pnn) {
682                 DEBUG(0,("We are not the recmaster - ignoring unban request\n"));
683                 talloc_free(mem_ctx);
684                 return;
685         }
686
687         DEBUG(0,("Node %u has been unbanned by the administrator\n", pnn));
688         ctdb_unban_node(rec, pnn);
689         talloc_free(mem_ctx);
690 }
691
692
693
694 /*
695   called when ctdb_wait_timeout should finish
696  */
697 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
698                               struct timeval yt, void *p)
699 {
700         uint32_t *timed_out = (uint32_t *)p;
701         (*timed_out) = 1;
702 }
703
704 /*
705   wait for a given number of seconds
706  */
707 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
708 {
709         uint32_t timed_out = 0;
710         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
711         while (!timed_out) {
712                 event_loop_once(ctdb->ev);
713         }
714 }
715
716
717 /*
718   update our local flags from all remote connected nodes. 
719  */
720 static int update_local_flags(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
721 {
722         int j;
723         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
724
725         /* get the nodemap for all active remote nodes and verify
726            they are the same as for this node
727          */
728         for (j=0; j<nodemap->num; j++) {
729                 struct ctdb_node_map *remote_nodemap=NULL;
730                 int ret;
731
732                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
733                         continue;
734                 }
735                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
736                         continue;
737                 }
738
739                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
740                                            mem_ctx, &remote_nodemap);
741                 if (ret != 0) {
742                         DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
743                                   nodemap->nodes[j].pnn));
744                         talloc_free(mem_ctx);
745                         return -1;
746                 }
747                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
748                         DEBUG(0,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
749                                  nodemap->nodes[j].pnn, nodemap->nodes[j].flags,
750                                  remote_nodemap->nodes[j].flags));
751                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
752                 }
753                 talloc_free(remote_nodemap);
754         }
755         talloc_free(mem_ctx);
756         return 0;
757 }
758
759
760 /* Create a new random generation ip. 
761    The generation id can not be the INVALID_GENERATION id
762 */
763 static uint32_t new_generation(void)
764 {
765         uint32_t generation;
766
767         while (1) {
768                 generation = random();
769
770                 if (generation != INVALID_GENERATION) {
771                         break;
772                 }
773         }
774
775         return generation;
776 }
777
778 /*
779   remember the trouble maker
780  */
781 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
782 {
783         struct ctdb_context *ctdb = rec->ctdb;
784
785         if (rec->last_culprit != culprit ||
786             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
787                 DEBUG(0,("New recovery culprit %u\n", culprit));
788                 /* either a new node is the culprit, or we've decide to forgive them */
789                 rec->last_culprit = culprit;
790                 rec->first_recover_time = timeval_current();
791                 rec->culprit_counter = 0;
792         }
793         rec->culprit_counter++;
794 }
795                 
796 /*
797   we are the recmaster, and recovery is needed - start a recovery run
798  */
799 static int do_recovery(struct ctdb_recoverd *rec, 
800                        TALLOC_CTX *mem_ctx, uint32_t pnn, uint32_t num_active,
801                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
802                        uint32_t culprit)
803 {
804         struct ctdb_context *ctdb = rec->ctdb;
805         int i, j, ret;
806         uint32_t generation;
807         struct ctdb_dbid_map *dbmap;
808
809         /* if recovery fails, force it again */
810         rec->need_recovery = true;
811
812         ctdb_set_culprit(rec, culprit);
813
814         if (rec->culprit_counter > 2*nodemap->num) {
815                 DEBUG(0,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
816                          culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
817                          ctdb->tunable.recovery_ban_period));
818                 ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
819         }
820
821         if (!ctdb_recovery_lock(ctdb, true)) {
822                 ctdb_set_culprit(rec, pnn);
823                 DEBUG(0,("Unable to get recovery lock - aborting recovery\n"));
824                 return -1;
825         }
826
827         /* set recovery mode to active on all nodes */
828         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
829         if (ret!=0) {
830                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
831                 return -1;
832         }
833
834         DEBUG(0, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
835
836         /* pick a new generation number */
837         generation = new_generation();
838
839         /* change the vnnmap on this node to use the new generation 
840            number but not on any other nodes.
841            this guarantees that if we abort the recovery prematurely
842            for some reason (a node stops responding?)
843            that we can just return immediately and we will reenter
844            recovery shortly again.
845            I.e. we deliberately leave the cluster with an inconsistent
846            generation id to allow us to abort recovery at any stage and
847            just restart it from scratch.
848          */
849         vnnmap->generation = generation;
850         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
851         if (ret != 0) {
852                 DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
853                 return -1;
854         }
855
856         /* get a list of all databases */
857         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
858         if (ret != 0) {
859                 DEBUG(0, (__location__ " Unable to get dbids from node :%u\n", pnn));
860                 return -1;
861         }
862
863
864
865         /* verify that all other nodes have all our databases */
866         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
867         if (ret != 0) {
868                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
869                 return -1;
870         }
871
872         /* verify that we have all the databases any other node has */
873         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
874         if (ret != 0) {
875                 DEBUG(0, (__location__ " Unable to create missing local databases\n"));
876                 return -1;
877         }
878
879
880
881         /* verify that all other nodes have all our databases */
882         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
883         if (ret != 0) {
884                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
885                 return -1;
886         }
887
888
889         DEBUG(1, (__location__ " Recovery - created remote databases\n"));
890
891         /* pull all remote databases onto the local node */
892         ret = pull_all_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
893         if (ret != 0) {
894                 DEBUG(0, (__location__ " Unable to pull remote databases\n"));
895                 return -1;
896         }
897
898         DEBUG(1, (__location__ " Recovery - pulled remote databases\n"));
899
900         /* push all local databases to the remote nodes */
901         ret = push_all_local_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
902         if (ret != 0) {
903                 DEBUG(0, (__location__ " Unable to push local databases\n"));
904                 return -1;
905         }
906
907         DEBUG(1, (__location__ " Recovery - pushed remote databases\n"));
908
909         /* build a new vnn map with all the currently active and
910            unbanned nodes */
911         generation = new_generation();
912         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
913         CTDB_NO_MEMORY(ctdb, vnnmap);
914         vnnmap->generation = generation;
915         vnnmap->size = num_active;
916         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
917         for (i=j=0;i<nodemap->num;i++) {
918                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
919                         vnnmap->map[j++] = nodemap->nodes[i].pnn;
920                 }
921         }
922
923
924
925         /* update to the new vnnmap on all nodes */
926         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
927         if (ret != 0) {
928                 DEBUG(0, (__location__ " Unable to update vnnmap on all nodes\n"));
929                 return -1;
930         }
931
932         DEBUG(1, (__location__ " Recovery - updated vnnmap\n"));
933
934         /* update recmaster to point to us for all nodes */
935         ret = set_recovery_master(ctdb, nodemap, pnn);
936         if (ret!=0) {
937                 DEBUG(0, (__location__ " Unable to set recovery master\n"));
938                 return -1;
939         }
940
941         DEBUG(1, (__location__ " Recovery - updated recmaster\n"));
942
943         /* repoint all local and remote database records to the local
944            node as being dmaster
945          */
946         ret = update_dmaster_on_all_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
947         if (ret != 0) {
948                 DEBUG(0, (__location__ " Unable to update dmaster on all databases\n"));
949                 return -1;
950         }
951
952         DEBUG(1, (__location__ " Recovery - updated dmaster on all databases\n"));
953
954         /*
955           update all nodes to have the same flags that we have
956          */
957         ret = update_flags_on_all_nodes(ctdb, nodemap);
958         if (ret != 0) {
959                 DEBUG(0, (__location__ " Unable to update flags on all nodes\n"));
960                 return -1;
961         }
962         
963         DEBUG(1, (__location__ " Recovery - updated flags\n"));
964
965         /*
966           run a vacuum operation on empty records
967          */
968         ret = vacuum_all_databases(ctdb, nodemap, dbmap);
969         if (ret != 0) {
970                 DEBUG(0, (__location__ " Unable to vacuum all databases\n"));
971                 return -1;
972         }
973
974         DEBUG(1, (__location__ " Recovery - vacuumed all databases\n"));
975
976         /*
977           if enabled, tell nodes to takeover their public IPs
978          */
979         if (ctdb->vnn) {
980                 rec->need_takeover_run = false;
981                 ret = ctdb_takeover_run(ctdb, nodemap);
982                 if (ret != 0) {
983                         DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
984                         return -1;
985                 }
986                 DEBUG(1, (__location__ " Recovery - done takeover\n"));
987         }
988
989         for (i=0;i<dbmap->num;i++) {
990                 DEBUG(0,("Recovered database with db_id 0x%08x\n", dbmap->dbs[i].dbid));
991         }
992
993         /* disable recovery mode */
994         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
995         if (ret!=0) {
996                 DEBUG(0, (__location__ " Unable to set recovery mode to normal on cluster\n"));
997                 return -1;
998         }
999
1000         /* send a message to all clients telling them that the cluster 
1001            has been reconfigured */
1002         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1003
1004         DEBUG(0, (__location__ " Recovery complete\n"));
1005
1006         rec->need_recovery = false;
1007
1008         /* We just finished a recovery successfully. 
1009            We now wait for rerecovery_timeout before we allow 
1010            another recovery to take place.
1011         */
1012         DEBUG(0, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1013         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1014         DEBUG(0, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1015
1016         return 0;
1017 }
1018
1019
1020 /*
1021   elections are won by first checking the number of connected nodes, then
1022   the priority time, then the pnn
1023  */
1024 struct election_message {
1025         uint32_t num_connected;
1026         struct timeval priority_time;
1027         uint32_t pnn;
1028         uint32_t node_flags;
1029 };
1030
1031 /*
1032   form this nodes election data
1033  */
1034 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1035 {
1036         int ret, i;
1037         struct ctdb_node_map *nodemap;
1038         struct ctdb_context *ctdb = rec->ctdb;
1039
1040         ZERO_STRUCTP(em);
1041
1042         em->pnn = rec->ctdb->pnn;
1043         em->priority_time = rec->priority_time;
1044         em->node_flags = rec->node_flags;
1045
1046         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1047         if (ret != 0) {
1048                 return;
1049         }
1050
1051         for (i=0;i<nodemap->num;i++) {
1052                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1053                         em->num_connected++;
1054                 }
1055         }
1056         talloc_free(nodemap);
1057 }
1058
1059 /*
1060   see if the given election data wins
1061  */
1062 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1063 {
1064         struct election_message myem;
1065         int cmp = 0;
1066
1067         ctdb_election_data(rec, &myem);
1068
1069         /* we cant win if we are banned */
1070         if (rec->node_flags & NODE_FLAGS_BANNED) {
1071                 return false;
1072         }       
1073
1074         /* we will automatically win if the other node is banned */
1075         if (em->node_flags & NODE_FLAGS_BANNED) {
1076                 return true;
1077         }
1078
1079         /* try to use the most connected node */
1080         if (cmp == 0) {
1081                 cmp = (int)myem.num_connected - (int)em->num_connected;
1082         }
1083
1084         /* then the longest running node */
1085         if (cmp == 0) {
1086                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1087         }
1088
1089         if (cmp == 0) {
1090                 cmp = (int)myem.pnn - (int)em->pnn;
1091         }
1092
1093         return cmp > 0;
1094 }
1095
1096 /*
1097   send out an election request
1098  */
1099 static int send_election_request(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn)
1100 {
1101         int ret;
1102         TDB_DATA election_data;
1103         struct election_message emsg;
1104         uint64_t srvid;
1105         struct ctdb_context *ctdb = rec->ctdb;
1106
1107         srvid = CTDB_SRVID_RECOVERY;
1108
1109         ctdb_election_data(rec, &emsg);
1110
1111         election_data.dsize = sizeof(struct election_message);
1112         election_data.dptr  = (unsigned char *)&emsg;
1113
1114
1115         /* first we assume we will win the election and set 
1116            recoverymaster to be ourself on the current node
1117          */
1118         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1119         if (ret != 0) {
1120                 DEBUG(0, (__location__ " failed to send recmaster election request\n"));
1121                 return -1;
1122         }
1123
1124
1125         /* send an election message to all active nodes */
1126         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1127
1128         return 0;
1129 }
1130
1131 /*
1132   this function will unban all nodes in the cluster
1133 */
1134 static void unban_all_nodes(struct ctdb_context *ctdb)
1135 {
1136         int ret, i;
1137         struct ctdb_node_map *nodemap;
1138         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1139         
1140         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1141         if (ret != 0) {
1142                 DEBUG(0,(__location__ " failed to get nodemap to unban all nodes\n"));
1143                 return;
1144         }
1145
1146         for (i=0;i<nodemap->num;i++) {
1147                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1148                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1149                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1150                 }
1151         }
1152
1153         talloc_free(tmp_ctx);
1154 }
1155
1156 /*
1157   handler for recovery master elections
1158 */
1159 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1160                              TDB_DATA data, void *private_data)
1161 {
1162         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1163         int ret;
1164         struct election_message *em = (struct election_message *)data.dptr;
1165         TALLOC_CTX *mem_ctx;
1166
1167         mem_ctx = talloc_new(ctdb);
1168
1169         /* someone called an election. check their election data
1170            and if we disagree and we would rather be the elected node, 
1171            send a new election message to all other nodes
1172          */
1173         if (ctdb_election_win(rec, em)) {
1174                 ret = send_election_request(rec, mem_ctx, ctdb_get_pnn(ctdb));
1175                 if (ret!=0) {
1176                         DEBUG(0, (__location__ " failed to initiate recmaster election"));
1177                 }
1178                 talloc_free(mem_ctx);
1179                 /*unban_all_nodes(ctdb);*/
1180                 return;
1181         }
1182
1183         /* release the recmaster lock */
1184         if (em->pnn != ctdb->pnn &&
1185             ctdb->recovery_lock_fd != -1) {
1186                 close(ctdb->recovery_lock_fd);
1187                 ctdb->recovery_lock_fd = -1;
1188                 unban_all_nodes(ctdb);
1189         }
1190
1191         /* ok, let that guy become recmaster then */
1192         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1193         if (ret != 0) {
1194                 DEBUG(0, (__location__ " failed to send recmaster election request"));
1195                 talloc_free(mem_ctx);
1196                 return;
1197         }
1198
1199         /* release any bans */
1200         rec->last_culprit = (uint32_t)-1;
1201         talloc_free(rec->banned_nodes);
1202         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1203         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1204
1205         talloc_free(mem_ctx);
1206         return;
1207 }
1208
1209
1210 /*
1211   force the start of the election process
1212  */
1213 static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn, 
1214                            struct ctdb_node_map *nodemap)
1215 {
1216         int ret;
1217         struct ctdb_context *ctdb = rec->ctdb;
1218
1219         /* set all nodes to recovery mode to stop all internode traffic */
1220         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1221         if (ret!=0) {
1222                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
1223                 return;
1224         }
1225         
1226         ret = send_election_request(rec, mem_ctx, pnn);
1227         if (ret!=0) {
1228                 DEBUG(0, (__location__ " failed to initiate recmaster election"));
1229                 return;
1230         }
1231
1232         /* wait for a few seconds to collect all responses */
1233         ctdb_wait_timeout(ctdb, ctdb->tunable.election_timeout);
1234 }
1235
1236
1237
1238 /*
1239   handler for when a node changes its flags
1240 */
1241 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1242                             TDB_DATA data, void *private_data)
1243 {
1244         int ret;
1245         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1246         struct ctdb_node_map *nodemap=NULL;
1247         TALLOC_CTX *tmp_ctx;
1248         uint32_t changed_flags;
1249         int i;
1250         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1251
1252         if (data.dsize != sizeof(*c)) {
1253                 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1254                 return;
1255         }
1256
1257         tmp_ctx = talloc_new(ctdb);
1258         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1259
1260         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1261
1262         for (i=0;i<nodemap->num;i++) {
1263                 if (nodemap->nodes[i].pnn == c->pnn) break;
1264         }
1265
1266         if (i == nodemap->num) {
1267                 DEBUG(0,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1268                 talloc_free(tmp_ctx);
1269                 return;
1270         }
1271
1272         changed_flags = c->old_flags ^ c->new_flags;
1273
1274         /* Dont let messages from remote nodes change the DISCONNECTED flag. 
1275            This flag is handled locally based on whether the local node
1276            can communicate with the node or not.
1277         */
1278         c->new_flags &= ~NODE_FLAGS_DISCONNECTED;
1279         if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) {
1280                 c->new_flags |= NODE_FLAGS_DISCONNECTED;
1281         }
1282
1283         if (nodemap->nodes[i].flags != c->new_flags) {
1284                 DEBUG(0,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1285         }
1286
1287         nodemap->nodes[i].flags = c->new_flags;
1288
1289         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1290                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1291
1292         if (ret == 0) {
1293                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1294                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1295         }
1296         
1297         if (ret == 0 &&
1298             ctdb->recovery_master == ctdb->pnn &&
1299             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
1300             ctdb->vnn) {
1301                 /* Only do the takeover run if the perm disabled or unhealthy
1302                    flags changed since these will cause an ip failover but not
1303                    a recovery.
1304                    If the node became disconnected or banned this will also
1305                    lead to an ip address failover but that is handled 
1306                    during recovery
1307                 */
1308                 if (changed_flags & NODE_FLAGS_DISABLED) {
1309                         rec->need_takeover_run = true;
1310                 }
1311         }
1312
1313         talloc_free(tmp_ctx);
1314 }
1315
1316
1317
1318 struct verify_recmode_normal_data {
1319         uint32_t count;
1320         enum monitor_result status;
1321 };
1322
1323 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1324 {
1325         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
1326
1327
1328         /* one more node has responded with recmode data*/
1329         rmdata->count--;
1330
1331         /* if we failed to get the recmode, then return an error and let
1332            the main loop try again.
1333         */
1334         if (state->state != CTDB_CONTROL_DONE) {
1335                 if (rmdata->status == MONITOR_OK) {
1336                         rmdata->status = MONITOR_FAILED;
1337                 }
1338                 return;
1339         }
1340
1341         /* if we got a response, then the recmode will be stored in the
1342            status field
1343         */
1344         if (state->status != CTDB_RECOVERY_NORMAL) {
1345                 DEBUG(0, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
1346                 rmdata->status = MONITOR_RECOVERY_NEEDED;
1347         }
1348
1349         return;
1350 }
1351
1352
1353 /* verify that all nodes are in normal recovery mode */
1354 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
1355 {
1356         struct verify_recmode_normal_data *rmdata;
1357         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1358         struct ctdb_client_control_state *state;
1359         enum monitor_result status;
1360         int j;
1361         
1362         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
1363         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1364         rmdata->count  = 0;
1365         rmdata->status = MONITOR_OK;
1366
1367         /* loop over all active nodes and send an async getrecmode call to 
1368            them*/
1369         for (j=0; j<nodemap->num; j++) {
1370                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1371                         continue;
1372                 }
1373                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
1374                                         CONTROL_TIMEOUT(), 
1375                                         nodemap->nodes[j].pnn);
1376                 if (state == NULL) {
1377                         /* we failed to send the control, treat this as 
1378                            an error and try again next iteration
1379                         */                      
1380                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
1381                         talloc_free(mem_ctx);
1382                         return MONITOR_FAILED;
1383                 }
1384
1385                 /* set up the callback functions */
1386                 state->async.fn = verify_recmode_normal_callback;
1387                 state->async.private_data = rmdata;
1388
1389                 /* one more control to wait for to complete */
1390                 rmdata->count++;
1391         }
1392
1393
1394         /* now wait for up to the maximum number of seconds allowed
1395            or until all nodes we expect a response from has replied
1396         */
1397         while (rmdata->count > 0) {
1398                 event_loop_once(ctdb->ev);
1399         }
1400
1401         status = rmdata->status;
1402         talloc_free(mem_ctx);
1403         return status;
1404 }
1405
1406
1407 struct verify_recmaster_data {
1408         uint32_t count;
1409         uint32_t pnn;
1410         enum monitor_result status;
1411 };
1412
1413 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
1414 {
1415         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
1416
1417
1418         /* one more node has responded with recmaster data*/
1419         rmdata->count--;
1420
1421         /* if we failed to get the recmaster, then return an error and let
1422            the main loop try again.
1423         */
1424         if (state->state != CTDB_CONTROL_DONE) {
1425                 if (rmdata->status == MONITOR_OK) {
1426                         rmdata->status = MONITOR_FAILED;
1427                 }
1428                 return;
1429         }
1430
1431         /* if we got a response, then the recmaster will be stored in the
1432            status field
1433         */
1434         if (state->status != rmdata->pnn) {
1435                 DEBUG(0,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
1436                 rmdata->status = MONITOR_ELECTION_NEEDED;
1437         }
1438
1439         return;
1440 }
1441
1442
1443 /* verify that all nodes agree that we are the recmaster */
1444 static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
1445 {
1446         struct verify_recmaster_data *rmdata;
1447         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1448         struct ctdb_client_control_state *state;
1449         enum monitor_result status;
1450         int j;
1451         
1452         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
1453         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1454         rmdata->count  = 0;
1455         rmdata->pnn    = pnn;
1456         rmdata->status = MONITOR_OK;
1457
1458         /* loop over all active nodes and send an async getrecmaster call to 
1459            them*/
1460         for (j=0; j<nodemap->num; j++) {
1461                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1462                         continue;
1463                 }
1464                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
1465                                         CONTROL_TIMEOUT(),
1466                                         nodemap->nodes[j].pnn);
1467                 if (state == NULL) {
1468                         /* we failed to send the control, treat this as 
1469                            an error and try again next iteration
1470                         */                      
1471                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
1472                         talloc_free(mem_ctx);
1473                         return MONITOR_FAILED;
1474                 }
1475
1476                 /* set up the callback functions */
1477                 state->async.fn = verify_recmaster_callback;
1478                 state->async.private_data = rmdata;
1479
1480                 /* one more control to wait for to complete */
1481                 rmdata->count++;
1482         }
1483
1484
1485         /* now wait for up to the maximum number of seconds allowed
1486            or until all nodes we expect a response from has replied
1487         */
1488         while (rmdata->count > 0) {
1489                 event_loop_once(ctdb->ev);
1490         }
1491
1492         status = rmdata->status;
1493         talloc_free(mem_ctx);
1494         return status;
1495 }
1496
1497
1498 /*
1499   the main monitoring loop
1500  */
1501 static void monitor_cluster(struct ctdb_context *ctdb)
1502 {
1503         uint32_t pnn, num_active, recmaster;
1504         TALLOC_CTX *mem_ctx=NULL;
1505         struct ctdb_node_map *nodemap=NULL;
1506         struct ctdb_node_map *remote_nodemap=NULL;
1507         struct ctdb_vnn_map *vnnmap=NULL;
1508         struct ctdb_vnn_map *remote_vnnmap=NULL;
1509         int i, j, ret;
1510         struct ctdb_recoverd *rec;
1511         struct ctdb_all_public_ips *ips;
1512         char c;
1513
1514         rec = talloc_zero(ctdb, struct ctdb_recoverd);
1515         CTDB_NO_MEMORY_FATAL(ctdb, rec);
1516
1517         rec->ctdb = ctdb;
1518         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1519         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1520
1521         rec->priority_time = timeval_current();
1522
1523         /* register a message port for recovery elections */
1524         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
1525
1526         /* and one for when nodes are disabled/enabled */
1527         ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
1528
1529         /* and one for when nodes are banned */
1530         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
1531
1532         /* and one for when nodes are unbanned */
1533         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
1534         
1535 again:
1536         if (mem_ctx) {
1537                 talloc_free(mem_ctx);
1538                 mem_ctx = NULL;
1539         }
1540         mem_ctx = talloc_new(ctdb);
1541         if (!mem_ctx) {
1542                 DEBUG(0,("Failed to create temporary context\n"));
1543                 exit(-1);
1544         }
1545
1546         /* we only check for recovery once every second */
1547         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
1548
1549         /* get relevant tunables */
1550         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
1551         if (ret != 0) {
1552                 DEBUG(0,("Failed to get tunables - retrying\n"));
1553                 goto again;
1554         }
1555
1556         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1557         if (pnn == (uint32_t)-1) {
1558                 DEBUG(0,("Failed to get local pnn - retrying\n"));
1559                 goto again;
1560         }
1561
1562         /* get the vnnmap */
1563         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
1564         if (ret != 0) {
1565                 DEBUG(0, (__location__ " Unable to get vnnmap from node %u\n", pnn));
1566                 goto again;
1567         }
1568
1569
1570         /* get number of nodes */
1571         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &nodemap);
1572         if (ret != 0) {
1573                 DEBUG(0, (__location__ " Unable to get nodemap from node %u\n", pnn));
1574                 goto again;
1575         }
1576
1577         /* remember our own node flags */
1578         rec->node_flags = nodemap->nodes[pnn].flags;
1579
1580         /* count how many active nodes there are */
1581         num_active = 0;
1582         for (i=0; i<nodemap->num; i++) {
1583                 if (rec->banned_nodes[nodemap->nodes[i].pnn] != NULL) {
1584                         nodemap->nodes[i].flags |= NODE_FLAGS_BANNED;
1585                 } else {
1586                         nodemap->nodes[i].flags &= ~NODE_FLAGS_BANNED;
1587                 }
1588                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1589                         num_active++;
1590                 }
1591         }
1592
1593
1594         /* check which node is the recovery master */
1595         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &recmaster);
1596         if (ret != 0) {
1597                 DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", pnn));
1598                 goto again;
1599         }
1600
1601         if (recmaster == (uint32_t)-1) {
1602                 DEBUG(0,(__location__ " Initial recovery master set - forcing election\n"));
1603                 force_election(rec, mem_ctx, pnn, nodemap);
1604                 goto again;
1605         }
1606         
1607         /* verify that the recmaster node is still active */
1608         for (j=0; j<nodemap->num; j++) {
1609                 if (nodemap->nodes[j].pnn==recmaster) {
1610                         break;
1611                 }
1612         }
1613
1614         if (j == nodemap->num) {
1615                 DEBUG(0, ("Recmaster node %u not in list. Force reelection\n", recmaster));
1616                 force_election(rec, mem_ctx, pnn, nodemap);
1617                 goto again;
1618         }
1619
1620         /* if recovery master is disconnected we must elect a new recmaster */
1621         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1622                 DEBUG(0, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
1623                 force_election(rec, mem_ctx, pnn, nodemap);
1624                 goto again;
1625         }
1626
1627         /* grap the nodemap from the recovery master to check if it is banned */
1628         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1629                                    mem_ctx, &remote_nodemap);
1630         if (ret != 0) {
1631                 DEBUG(0, (__location__ " Unable to get nodemap from recovery master %u\n", 
1632                           nodemap->nodes[j].pnn));
1633                 goto again;
1634         }
1635
1636
1637         if (remote_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1638                 DEBUG(0, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
1639                 force_election(rec, mem_ctx, pnn, nodemap);
1640                 goto again;
1641         }
1642
1643         /* verify that the public ip address allocation is consistent */
1644         if (ctdb->vnn != NULL) {
1645                 ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
1646                 if (ret != 0) {
1647                         DEBUG(0, ("Unable to get public ips from node %u\n", i));
1648                         goto again;
1649                 }
1650                 for (j=0; j<ips->num; j++) {
1651                         /* verify that we have the ip addresses we should have
1652                            and we dont have ones we shouldnt have.
1653                            if we find an inconsistency we set recmode to
1654                            active on the local node and wait for the recmaster
1655                            to do a full blown recovery
1656                         */
1657                         if (ips->ips[j].pnn == pnn) {
1658                                 if (!ctdb_sys_have_ip(ips->ips[j].sin)) {
1659                                         DEBUG(0,("Public address '%s' is missing and we should serve this ip\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
1660                                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1661                                         if (ret != 0) {
1662                                                 DEBUG(0,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
1663                                                 goto again;
1664                                         }
1665                                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
1666                                         if (ret != 0) {
1667                                                 DEBUG(0,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
1668                                                 goto again;
1669                                         }
1670                                 }
1671                         } else {
1672                                 if (ctdb_sys_have_ip(ips->ips[j].sin)) {
1673                                         DEBUG(0,("We are still serving a public address '%s' that we should not be serving.\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
1674                                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1675                                         if (ret != 0) {
1676                                                 DEBUG(0,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
1677                                                 goto again;
1678                                         }
1679                                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
1680                                         if (ret != 0) {
1681                                                 DEBUG(0,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
1682                                                 goto again;
1683                                         }
1684                                 }
1685                         }
1686                 }
1687         }
1688
1689         /* if we are not the recmaster then we do not need to check
1690            if recovery is needed
1691          */
1692         if (pnn != recmaster) {
1693                 goto again;
1694         }
1695
1696
1697         /* ensure our local copies of flags are right */
1698         ret = update_local_flags(ctdb, nodemap);
1699         if (ret != 0) {
1700                 DEBUG(0,("Unable to update local flags\n"));
1701                 goto again;
1702         }
1703
1704         /* update the list of public ips that a node can handle for
1705            all connected nodes
1706         */
1707         for (j=0; j<nodemap->num; j++) {
1708                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1709                         continue;
1710                 }
1711                 /* release any existing data */
1712                 if (ctdb->nodes[j]->public_ips) {
1713                         talloc_free(ctdb->nodes[j]->public_ips);
1714                         ctdb->nodes[j]->public_ips = NULL;
1715                 }
1716                 /* grab a new shiny list of public ips from the node */
1717                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
1718                         ctdb->nodes[j]->pnn, 
1719                         ctdb->nodes,
1720                         &ctdb->nodes[j]->public_ips)) {
1721                         DEBUG(0,("Failed to read public ips from node : %u\n", 
1722                                 ctdb->nodes[j]->pnn));
1723                         goto again;
1724                 }
1725         }
1726
1727
1728         /* verify that all active nodes agree that we are the recmaster */
1729         switch (verify_recmaster(ctdb, nodemap, pnn)) {
1730         case MONITOR_RECOVERY_NEEDED:
1731                 /* can not happen */
1732                 goto again;
1733         case MONITOR_ELECTION_NEEDED:
1734                 force_election(rec, mem_ctx, pnn, nodemap);
1735                 goto again;
1736         case MONITOR_OK:
1737                 break;
1738         case MONITOR_FAILED:
1739                 goto again;
1740         }
1741
1742
1743         if (rec->need_recovery) {
1744                 /* a previous recovery didn't finish */
1745                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
1746                 goto again;             
1747         }
1748
1749         /* verify that all active nodes are in normal mode 
1750            and not in recovery mode 
1751          */
1752         switch (verify_recmode(ctdb, nodemap)) {
1753         case MONITOR_RECOVERY_NEEDED:
1754                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
1755                 goto again;
1756         case MONITOR_FAILED:
1757                 goto again;
1758         case MONITOR_ELECTION_NEEDED:
1759                 /* can not happen */
1760         case MONITOR_OK:
1761                 break;
1762         }
1763
1764
1765         /* we should have the reclock - check its not stale */
1766         if (ctdb->recovery_lock_fd == -1) {
1767                 DEBUG(0,("recovery master doesn't have the recovery lock\n"));
1768                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
1769                 goto again;
1770         }
1771
1772         if (read(ctdb->recovery_lock_fd, &c, 1) == -1) {
1773                 DEBUG(0,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
1774                 close(ctdb->recovery_lock_fd);
1775                 ctdb->recovery_lock_fd = -1;
1776                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
1777                 goto again;
1778         }
1779
1780         /* get the nodemap for all active remote nodes and verify
1781            they are the same as for this node
1782          */
1783         for (j=0; j<nodemap->num; j++) {
1784                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1785                         continue;
1786                 }
1787                 if (nodemap->nodes[j].pnn == pnn) {
1788                         continue;
1789                 }
1790
1791                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1792                                            mem_ctx, &remote_nodemap);
1793                 if (ret != 0) {
1794                         DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
1795                                   nodemap->nodes[j].pnn));
1796                         goto again;
1797                 }
1798
1799                 /* if the nodes disagree on how many nodes there are
1800                    then this is a good reason to try recovery
1801                  */
1802                 if (remote_nodemap->num != nodemap->num) {
1803                         DEBUG(0, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
1804                                   nodemap->nodes[j].pnn, remote_nodemap->num, nodemap->num));
1805                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1806                         goto again;
1807                 }
1808
1809                 /* if the nodes disagree on which nodes exist and are
1810                    active, then that is also a good reason to do recovery
1811                  */
1812                 for (i=0;i<nodemap->num;i++) {
1813                         if (remote_nodemap->nodes[i].pnn != nodemap->nodes[i].pnn) {
1814                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
1815                                           nodemap->nodes[j].pnn, i, 
1816                                           remote_nodemap->nodes[i].pnn, nodemap->nodes[i].pnn));
1817                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1818                                             vnnmap, nodemap->nodes[j].pnn);
1819                                 goto again;
1820                         }
1821                         if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
1822                             (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1823                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
1824                                           nodemap->nodes[j].pnn, i,
1825                                           remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
1826                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1827                                             vnnmap, nodemap->nodes[j].pnn);
1828                                 goto again;
1829                         }
1830                 }
1831
1832         }
1833
1834
1835         /* there better be the same number of lmasters in the vnn map
1836            as there are active nodes or we will have to do a recovery
1837          */
1838         if (vnnmap->size != num_active) {
1839                 DEBUG(0, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
1840                           vnnmap->size, num_active));
1841                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
1842                 goto again;
1843         }
1844
1845         /* verify that all active nodes in the nodemap also exist in 
1846            the vnnmap.
1847          */
1848         for (j=0; j<nodemap->num; j++) {
1849                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1850                         continue;
1851                 }
1852                 if (nodemap->nodes[j].pnn == pnn) {
1853                         continue;
1854                 }
1855
1856                 for (i=0; i<vnnmap->size; i++) {
1857                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
1858                                 break;
1859                         }
1860                 }
1861                 if (i == vnnmap->size) {
1862                         DEBUG(0, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
1863                                   nodemap->nodes[j].pnn));
1864                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1865                         goto again;
1866                 }
1867         }
1868
1869         
1870         /* verify that all other nodes have the same vnnmap
1871            and are from the same generation
1872          */
1873         for (j=0; j<nodemap->num; j++) {
1874                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1875                         continue;
1876                 }
1877                 if (nodemap->nodes[j].pnn == pnn) {
1878                         continue;
1879                 }
1880
1881                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1882                                           mem_ctx, &remote_vnnmap);
1883                 if (ret != 0) {
1884                         DEBUG(0, (__location__ " Unable to get vnnmap from remote node %u\n", 
1885                                   nodemap->nodes[j].pnn));
1886                         goto again;
1887                 }
1888
1889                 /* verify the vnnmap generation is the same */
1890                 if (vnnmap->generation != remote_vnnmap->generation) {
1891                         DEBUG(0, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
1892                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
1893                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1894                         goto again;
1895                 }
1896
1897                 /* verify the vnnmap size is the same */
1898                 if (vnnmap->size != remote_vnnmap->size) {
1899                         DEBUG(0, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
1900                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
1901                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1902                         goto again;
1903                 }
1904
1905                 /* verify the vnnmap is the same */
1906                 for (i=0;i<vnnmap->size;i++) {
1907                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
1908                                 DEBUG(0, (__location__ " Remote node %u has different vnnmap.\n", 
1909                                           nodemap->nodes[j].pnn));
1910                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1911                                             vnnmap, nodemap->nodes[j].pnn);
1912                                 goto again;
1913                         }
1914                 }
1915         }
1916
1917         /* we might need to change who has what IP assigned */
1918         if (rec->need_takeover_run) {
1919                 rec->need_takeover_run = false;
1920                 ret = ctdb_takeover_run(ctdb, nodemap);
1921                 if (ret != 0) {
1922                         DEBUG(0, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
1923                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1924                                     vnnmap, ctdb->pnn);
1925                 }
1926         }
1927
1928         goto again;
1929
1930 }
1931
1932 /*
1933   event handler for when the main ctdbd dies
1934  */
1935 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
1936                                  uint16_t flags, void *private_data)
1937 {
1938         DEBUG(0,("recovery daemon parent died - exiting\n"));
1939         _exit(1);
1940 }
1941
1942 /*
1943   startup the recovery daemon as a child of the main ctdb daemon
1944  */
1945 int ctdb_start_recoverd(struct ctdb_context *ctdb)
1946 {
1947         int ret;
1948         int fd[2];
1949         pid_t child;
1950
1951         if (pipe(fd) != 0) {
1952                 return -1;
1953         }
1954
1955         child = fork();
1956         if (child == -1) {
1957                 return -1;
1958         }
1959         
1960         if (child != 0) {
1961                 close(fd[0]);
1962                 return 0;
1963         }
1964
1965         close(fd[1]);
1966
1967         /* shutdown the transport */
1968         ctdb->methods->shutdown(ctdb);
1969
1970         /* get a new event context */
1971         talloc_free(ctdb->ev);
1972         ctdb->ev = event_context_init(ctdb);
1973
1974         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
1975                      ctdb_recoverd_parent, &fd[0]);     
1976
1977         close(ctdb->daemon.sd);
1978         ctdb->daemon.sd = -1;
1979
1980         srandom(getpid() ^ time(NULL));
1981
1982         /* initialise ctdb */
1983         ret = ctdb_socket_connect(ctdb);
1984         if (ret != 0) {
1985                 DEBUG(0, (__location__ " Failed to init ctdb\n"));
1986                 exit(1);
1987         }
1988
1989         monitor_cluster(ctdb);
1990
1991         DEBUG(0,("ERROR: ctdb_recoverd finished!?\n"));
1992         return -1;
1993 }