update lib/replace from samba4
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "popt.h"
25 #include "cmdline.h"
26 #include "../include/ctdb.h"
27 #include "../include/ctdb_private.h"
28
29
30 struct ban_state {
31         struct ctdb_recoverd *rec;
32         uint32_t banned_node;
33 };
34
35 /*
36   private state of recovery daemon
37  */
38 struct ctdb_recoverd {
39         struct ctdb_context *ctdb;
40         uint32_t last_culprit;
41         uint32_t culprit_counter;
42         struct timeval first_recover_time;
43         struct ban_state **banned_nodes;
44         struct timeval priority_time;
45 };
46
47 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
48 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
49
50 /*
51   unban a node
52  */
53 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t vnn)
54 {
55         struct ctdb_context *ctdb = rec->ctdb;
56
57         if (!ctdb_validate_vnn(ctdb, vnn)) {
58                 DEBUG(0,("Bad vnn %u in ctdb_ban_node\n", vnn));
59                 return;
60         }
61
62         if (rec->banned_nodes[vnn] == NULL) {
63                 return;
64         }
65
66         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), vnn, 0, NODE_FLAGS_BANNED);
67
68         talloc_free(rec->banned_nodes[vnn]);
69         rec->banned_nodes[vnn] = NULL;
70 }
71
72
73 /*
74   called when a ban has timed out
75  */
76 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
77 {
78         struct ban_state *state = talloc_get_type(p, struct ban_state);
79         struct ctdb_recoverd *rec = state->rec;
80         uint32_t vnn = state->banned_node;
81
82         DEBUG(0,("Node %u is now unbanned\n", vnn));
83         ctdb_unban_node(rec, vnn);
84 }
85
86 /*
87   ban a node for a period of time
88  */
89 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t vnn, uint32_t ban_time)
90 {
91         struct ctdb_context *ctdb = rec->ctdb;
92
93         if (!ctdb_validate_vnn(ctdb, vnn)) {
94                 DEBUG(0,("Bad vnn %u in ctdb_ban_node\n", vnn));
95                 return;
96         }
97
98         if (vnn == ctdb->vnn) {
99                 DEBUG(0,("self ban - lowering our election priority\n"));
100                 /* banning ourselves - lower our election priority */
101                 rec->priority_time = timeval_current();
102         }
103
104         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), vnn, NODE_FLAGS_BANNED, 0);
105
106         rec->banned_nodes[vnn] = talloc(rec, struct ban_state);
107         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[vnn]);
108
109         rec->banned_nodes[vnn]->rec = rec;
110         rec->banned_nodes[vnn]->banned_node = vnn;
111
112         if (ban_time != 0) {
113                 event_add_timed(ctdb->ev, rec->banned_nodes[vnn], 
114                                 timeval_current_ofs(ban_time, 0),
115                                 ctdb_ban_timeout, rec->banned_nodes[vnn]);
116         }
117 }
118
119
120 /*
121   change recovery mode on all nodes
122  */
123 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
124 {
125         int j, ret;
126
127         /* start the freeze process immediately on all nodes */
128         ctdb_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, 
129                      CTDB_CONTROL_FREEZE, CTDB_CTRL_FLAG_NOREPLY, tdb_null, 
130                      NULL, NULL, NULL, NULL, NULL);
131
132         /* set recovery mode to active on all nodes */
133         for (j=0; j<nodemap->num; j++) {
134                 /* dont change it for nodes that are unavailable */
135                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
136                         continue;
137                 }
138
139                 if (rec_mode == CTDB_RECOVERY_ACTIVE) {
140                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn);
141                         if (ret != 0) {
142                                 DEBUG(0, (__location__ " Unable to freeze node %u\n", nodemap->nodes[j].vnn));
143                                 return -1;
144                         }
145                 }
146
147                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, rec_mode);
148                 if (ret != 0) {
149                         DEBUG(0, (__location__ " Unable to set recmode on node %u\n", nodemap->nodes[j].vnn));
150                         return -1;
151                 }
152
153                 if (rec_mode == CTDB_RECOVERY_NORMAL) {
154                         ret = ctdb_ctrl_thaw(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn);
155                         if (ret != 0) {
156                                 DEBUG(0, (__location__ " Unable to thaw node %u\n", nodemap->nodes[j].vnn));
157                                 return -1;
158                         }
159                 }
160         }
161
162         return 0;
163 }
164
165 /*
166   change recovery master on all node
167  */
168 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t vnn)
169 {
170         int j, ret;
171
172         /* set recovery master to vnn on all nodes */
173         for (j=0; j<nodemap->num; j++) {
174                 /* dont change it for nodes that are unavailable */
175                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
176                         continue;
177                 }
178
179                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, vnn);
180                 if (ret != 0) {
181                         DEBUG(0, (__location__ " Unable to set recmaster on node %u\n", nodemap->nodes[j].vnn));
182                         return -1;
183                 }
184         }
185
186         return 0;
187 }
188
189
190 /*
191   ensure all other nodes have attached to any databases that we have
192  */
193 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
194                                            uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
195 {
196         int i, j, db, ret;
197         struct ctdb_dbid_map *remote_dbmap;
198
199         /* verify that all other nodes have all our databases */
200         for (j=0; j<nodemap->num; j++) {
201                 /* we dont need to ourself ourselves */
202                 if (nodemap->nodes[j].vnn == vnn) {
203                         continue;
204                 }
205                 /* dont check nodes that are unavailable */
206                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
207                         continue;
208                 }
209
210                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, 
211                                          mem_ctx, &remote_dbmap);
212                 if (ret != 0) {
213                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", vnn));
214                         return -1;
215                 }
216
217                 /* step through all local databases */
218                 for (db=0; db<dbmap->num;db++) {
219                         const char *name;
220
221
222                         for (i=0;i<remote_dbmap->num;i++) {
223                                 if (dbmap->dbids[db] == remote_dbmap->dbids[i]) {
224                                         break;
225                                 }
226                         }
227                         /* the remote node already have this database */
228                         if (i!=remote_dbmap->num) {
229                                 continue;
230                         }
231                         /* ok so we need to create this database */
232                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), vnn, dbmap->dbids[db], mem_ctx, &name);
233                         if (ret != 0) {
234                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", vnn));
235                                 return -1;
236                         }
237                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, mem_ctx, name);
238                         if (ret != 0) {
239                                 DEBUG(0, (__location__ " Unable to create remote db:%s\n", name));
240                                 return -1;
241                         }
242                 }
243         }
244
245         return 0;
246 }
247
248
249 /*
250   ensure we are attached to any databases that anyone else is attached to
251  */
252 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
253                                           uint32_t vnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
254 {
255         int i, j, db, ret;
256         struct ctdb_dbid_map *remote_dbmap;
257
258         /* verify that we have all database any other node has */
259         for (j=0; j<nodemap->num; j++) {
260                 /* we dont need to ourself ourselves */
261                 if (nodemap->nodes[j].vnn == vnn) {
262                         continue;
263                 }
264                 /* dont check nodes that are unavailable */
265                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
266                         continue;
267                 }
268
269                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, 
270                                          mem_ctx, &remote_dbmap);
271                 if (ret != 0) {
272                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", vnn));
273                         return -1;
274                 }
275
276                 /* step through all databases on the remote node */
277                 for (db=0; db<remote_dbmap->num;db++) {
278                         const char *name;
279
280                         for (i=0;i<(*dbmap)->num;i++) {
281                                 if (remote_dbmap->dbids[db] == (*dbmap)->dbids[i]) {
282                                         break;
283                                 }
284                         }
285                         /* we already have this db locally */
286                         if (i!=(*dbmap)->num) {
287                                 continue;
288                         }
289                         /* ok so we need to create this database and
290                            rebuild dbmap
291                          */
292                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, 
293                                             remote_dbmap->dbids[db], mem_ctx, &name);
294                         if (ret != 0) {
295                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", 
296                                           nodemap->nodes[j].vnn));
297                                 return -1;
298                         }
299                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, name);
300                         if (ret != 0) {
301                                 DEBUG(0, (__location__ " Unable to create local db:%s\n", name));
302                                 return -1;
303                         }
304                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, dbmap);
305                         if (ret != 0) {
306                                 DEBUG(0, (__location__ " Unable to reread dbmap on node %u\n", vnn));
307                                 return -1;
308                         }
309                 }
310         }
311
312         return 0;
313 }
314
315
316 /*
317   pull all the remote database contents into ours
318  */
319 static int pull_all_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
320                                      uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
321 {
322         int i, j, ret;
323
324         /* pull all records from all other nodes across onto this node
325            (this merges based on rsn)
326         */
327         for (i=0;i<dbmap->num;i++) {
328                 for (j=0; j<nodemap->num; j++) {
329                         /* we dont need to merge with ourselves */
330                         if (nodemap->nodes[j].vnn == vnn) {
331                                 continue;
332                         }
333                         /* dont merge from nodes that are unavailable */
334                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
335                                 continue;
336                         }
337                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, 
338                                                vnn, dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
339                         if (ret != 0) {
340                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
341                                           nodemap->nodes[j].vnn, vnn));
342                                 return -1;
343                         }
344                 }
345         }
346
347         return 0;
348 }
349
350
351 /*
352   change the dmaster on all databases to point to us
353  */
354 static int update_dmaster_on_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
355                                            uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
356 {
357         int i, j, ret;
358
359         /* update dmaster to point to this node for all databases/nodes */
360         for (i=0;i<dbmap->num;i++) {
361                 for (j=0; j<nodemap->num; j++) {
362                         /* dont repoint nodes that are unavailable */
363                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
364                                 continue;
365                         }
366                         ret = ctdb_ctrl_setdmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, ctdb, dbmap->dbids[i], vnn);
367                         if (ret != 0) {
368                                 DEBUG(0, (__location__ " Unable to set dmaster for node %u db:0x%08x\n", nodemap->nodes[j].vnn, dbmap->dbids[i]));
369                                 return -1;
370                         }
371                 }
372         }
373
374         return 0;
375 }
376
377
378 /*
379   update flags on all active nodes
380  */
381 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
382 {
383         int i;
384         for (i=0;i<nodemap->num;i++) {
385                 struct ctdb_node_flag_change c;
386                 TDB_DATA data;
387
388                 c.vnn = nodemap->nodes[i].vnn;
389                 c.flags = nodemap->nodes[i].flags;
390
391                 data.dptr = (uint8_t *)&c;
392                 data.dsize = sizeof(c);
393
394                 ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
395                                   CTDB_SRVID_NODE_FLAGS_CHANGED, data);
396
397         }
398         return 0;
399 }
400
401 /*
402   vacuum one database
403  */
404 static int vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *nodemap)
405 {
406         uint64_t max_rsn;
407         int ret, i;
408
409         /* find max rsn on our local node for this db */
410         ret = ctdb_ctrl_get_max_rsn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, db_id, &max_rsn);
411         if (ret != 0) {
412                 return -1;
413         }
414
415         /* set rsn on non-empty records to max_rsn+1 */
416         for (i=0;i<nodemap->num;i++) {
417                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
418                         continue;
419                 }
420                 ret = ctdb_ctrl_set_rsn_nonempty(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].vnn,
421                                                  db_id, max_rsn+1);
422                 if (ret != 0) {
423                         DEBUG(0,(__location__ " Failed to set rsn on node %u to %llu\n",
424                                  nodemap->nodes[i].vnn, (unsigned long long)max_rsn+1));
425                         return -1;
426                 }
427         }
428
429         /* delete records with rsn < max_rsn+1 on all nodes */
430         for (i=0;i<nodemap->num;i++) {
431                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
432                         continue;
433                 }
434                 ret = ctdb_ctrl_delete_low_rsn(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].vnn,
435                                                  db_id, max_rsn+1);
436                 if (ret != 0) {
437                         DEBUG(0,(__location__ " Failed to delete records on node %u with rsn below %llu\n",
438                                  nodemap->nodes[i].vnn, (unsigned long long)max_rsn+1));
439                         return -1;
440                 }
441         }
442
443
444         return 0;
445 }
446
447
448 /*
449   vacuum all attached databases
450  */
451 static int vacuum_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
452                                 struct ctdb_dbid_map *dbmap)
453 {
454         int i;
455
456         /* update dmaster to point to this node for all databases/nodes */
457         for (i=0;i<dbmap->num;i++) {
458                 if (vacuum_db(ctdb, dbmap->dbids[i], nodemap) != 0) {
459                         return -1;
460                 }
461         }
462         return 0;
463 }
464
465
466 /*
467   push out all our database contents to all other nodes
468  */
469 static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
470                                     uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
471 {
472         int i, j, ret;
473
474         /* push all records out to the nodes again */
475         for (i=0;i<dbmap->num;i++) {
476                 for (j=0; j<nodemap->num; j++) {
477                         /* we dont need to push to ourselves */
478                         if (nodemap->nodes[j].vnn == vnn) {
479                                 continue;
480                         }
481                         /* dont push to nodes that are unavailable */
482                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
483                                 continue;
484                         }
485                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), vnn, nodemap->nodes[j].vnn, 
486                                                dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
487                         if (ret != 0) {
488                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
489                                           vnn, nodemap->nodes[j].vnn));
490                                 return -1;
491                         }
492                 }
493         }
494
495         return 0;
496 }
497
498
499 /*
500   ensure all nodes have the same vnnmap we do
501  */
502 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
503                                       uint32_t vnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
504 {
505         int j, ret;
506
507         /* push the new vnn map out to all the nodes */
508         for (j=0; j<nodemap->num; j++) {
509                 /* dont push to nodes that are unavailable */
510                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
511                         continue;
512                 }
513
514                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, mem_ctx, vnnmap);
515                 if (ret != 0) {
516                         DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", vnn));
517                         return -1;
518                 }
519         }
520
521         return 0;
522 }
523
524
525 /*
526   handler for when the admin bans a node
527 */
528 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
529                         TDB_DATA data, void *private_data)
530 {
531         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
532         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
533         uint32_t recmaster;
534         int ret;
535
536         if (data.dsize != sizeof(*b)) {
537                 DEBUG(0,("Bad data in ban_handler\n"));
538                 return;
539         }
540
541         ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
542         if (ret != 0) {
543                 DEBUG(0,(__location__ " Failed to find the recmaster\n"));
544                 return;
545         }
546
547         if (recmaster != ctdb->vnn) {
548                 DEBUG(0,("We are not the recmaster - ignoring ban request\n"));
549                 return;
550         }
551
552         DEBUG(0,("Node %u has been banned for %u seconds by the administrator\n", 
553                  b->vnn, b->ban_time));
554         ctdb_ban_node(rec, b->vnn, b->ban_time);
555 }
556
557 /*
558   handler for when the admin unbans a node
559 */
560 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
561                           TDB_DATA data, void *private_data)
562 {
563         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
564         uint32_t vnn;
565         int ret;
566         uint32_t recmaster;
567
568         if (data.dsize != sizeof(uint32_t)) {
569                 DEBUG(0,("Bad data in unban_handler\n"));
570                 return;
571         }
572         vnn = *(uint32_t *)data.dptr;
573
574         ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
575         if (ret != 0) {
576                 DEBUG(0,(__location__ " Failed to find the recmaster\n"));
577                 return;
578         }
579
580         if (recmaster != ctdb->vnn) {
581                 DEBUG(0,("We are not the recmaster - ignoring unban request\n"));
582                 return;
583         }
584
585         DEBUG(0,("Node %u has been unbanned by the administrator\n", vnn));
586         ctdb_unban_node(rec, vnn);
587 }
588
589
590
591 /*
592   called when ctdb_wait_timeout should finish
593  */
594 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
595                               struct timeval yt, void *p)
596 {
597         uint32_t *timed_out = (uint32_t *)p;
598         (*timed_out) = 1;
599 }
600
601 /*
602   wait for a given number of seconds
603  */
604 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
605 {
606         uint32_t timed_out = 0;
607         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
608         while (!timed_out) {
609                 event_loop_once(ctdb->ev);
610         }
611 }
612
613 /*
614   we are the recmaster, and recovery is needed - start a recovery run
615  */
616 static int do_recovery(struct ctdb_recoverd *rec, 
617                        TALLOC_CTX *mem_ctx, uint32_t vnn, uint32_t num_active,
618                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
619                        uint32_t culprit)
620 {
621         struct ctdb_context *ctdb = rec->ctdb;
622         int i, j, ret;
623         uint32_t generation;
624         struct ctdb_dbid_map *dbmap;
625
626         if (rec->last_culprit != culprit ||
627             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
628                 /* either a new node is the culprit, or we've decide to forgive them */
629                 rec->last_culprit = culprit;
630                 rec->first_recover_time = timeval_current();
631                 rec->culprit_counter = 0;
632         }
633         rec->culprit_counter++;
634
635         if (rec->culprit_counter > 2*nodemap->num) {
636                 DEBUG(0,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
637                          culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
638                          ctdb->tunable.recovery_ban_period));
639                 ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
640         }
641
642         if (!ctdb_recovery_lock(ctdb, true)) {
643                 DEBUG(0,("Unable to get recovery lock - aborting recovery\n"));
644                 return -1;
645         }
646
647         /* set recovery mode to active on all nodes */
648         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
649         if (ret!=0) {
650                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
651                 return -1;
652         }
653
654         DEBUG(0, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
655
656         /* pick a new generation number */
657         generation = random();
658
659         /* change the vnnmap on this node to use the new generation 
660            number but not on any other nodes.
661            this guarantees that if we abort the recovery prematurely
662            for some reason (a node stops responding?)
663            that we can just return immediately and we will reenter
664            recovery shortly again.
665            I.e. we deliberately leave the cluster with an inconsistent
666            generation id to allow us to abort recovery at any stage and
667            just restart it from scratch.
668          */
669         vnnmap->generation = generation;
670         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, vnnmap);
671         if (ret != 0) {
672                 DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", vnn));
673                 return -1;
674         }
675
676         /* get a list of all databases */
677         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, &dbmap);
678         if (ret != 0) {
679                 DEBUG(0, (__location__ " Unable to get dbids from node :%u\n", vnn));
680                 return -1;
681         }
682
683
684
685         /* verify that all other nodes have all our databases */
686         ret = create_missing_remote_databases(ctdb, nodemap, vnn, dbmap, mem_ctx);
687         if (ret != 0) {
688                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
689                 return -1;
690         }
691
692         /* verify that we have all the databases any other node has */
693         ret = create_missing_local_databases(ctdb, nodemap, vnn, &dbmap, mem_ctx);
694         if (ret != 0) {
695                 DEBUG(0, (__location__ " Unable to create missing local databases\n"));
696                 return -1;
697         }
698
699
700
701         /* verify that all other nodes have all our databases */
702         ret = create_missing_remote_databases(ctdb, nodemap, vnn, dbmap, mem_ctx);
703         if (ret != 0) {
704                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
705                 return -1;
706         }
707
708
709         DEBUG(1, (__location__ " Recovery - created remote databases\n"));
710
711         /* pull all remote databases onto the local node */
712         ret = pull_all_remote_databases(ctdb, nodemap, vnn, dbmap, mem_ctx);
713         if (ret != 0) {
714                 DEBUG(0, (__location__ " Unable to pull remote databases\n"));
715                 return -1;
716         }
717
718         DEBUG(1, (__location__ " Recovery - pulled remote databases\n"));
719
720         /* push all local databases to the remote nodes */
721         ret = push_all_local_databases(ctdb, nodemap, vnn, dbmap, mem_ctx);
722         if (ret != 0) {
723                 DEBUG(0, (__location__ " Unable to push local databases\n"));
724                 return -1;
725         }
726
727         DEBUG(1, (__location__ " Recovery - pushed remote databases\n"));
728
729         /* build a new vnn map with all the currently active and
730            unbanned nodes */
731         generation = random();
732         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
733         CTDB_NO_MEMORY(ctdb, vnnmap);
734         vnnmap->generation = generation;
735         vnnmap->size = num_active;
736         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
737         for (i=j=0;i<nodemap->num;i++) {
738                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
739                         vnnmap->map[j++] = nodemap->nodes[i].vnn;
740                 }
741         }
742
743
744
745         /* update to the new vnnmap on all nodes */
746         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, vnn, vnnmap, mem_ctx);
747         if (ret != 0) {
748                 DEBUG(0, (__location__ " Unable to update vnnmap on all nodes\n"));
749                 return -1;
750         }
751
752         DEBUG(1, (__location__ " Recovery - updated vnnmap\n"));
753
754         /* update recmaster to point to us for all nodes */
755         ret = set_recovery_master(ctdb, nodemap, vnn);
756         if (ret!=0) {
757                 DEBUG(0, (__location__ " Unable to set recovery master\n"));
758                 return -1;
759         }
760
761         DEBUG(1, (__location__ " Recovery - updated recmaster\n"));
762
763         /* repoint all local and remote database records to the local
764            node as being dmaster
765          */
766         ret = update_dmaster_on_all_databases(ctdb, nodemap, vnn, dbmap, mem_ctx);
767         if (ret != 0) {
768                 DEBUG(0, (__location__ " Unable to update dmaster on all databases\n"));
769                 return -1;
770         }
771
772         DEBUG(1, (__location__ " Recovery - updated dmaster on all databases\n"));
773
774         /*
775           update all nodes to have the same flags that we have
776          */
777         ret = update_flags_on_all_nodes(ctdb, nodemap);
778         if (ret != 0) {
779                 DEBUG(0, (__location__ " Unable to update flags on all nodes\n"));
780                 return -1;
781         }
782         
783         DEBUG(1, (__location__ " Recovery - updated flags\n"));
784
785         /*
786           run a vacuum operation on empty records
787          */
788         ret = vacuum_all_databases(ctdb, nodemap, dbmap);
789         if (ret != 0) {
790                 DEBUG(0, (__location__ " Unable to vacuum all databases\n"));
791                 return -1;
792         }
793
794         DEBUG(1, (__location__ " Recovery - vacuumed all databases\n"));
795
796         /*
797           if enabled, tell nodes to takeover their public IPs
798          */
799         if (ctdb->takeover.enabled) {
800                 ret = ctdb_takeover_run(ctdb, nodemap);
801                 if (ret != 0) {
802                         DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
803                         return -1;
804                 }
805                 DEBUG(1, (__location__ " Recovery - done takeover\n"));
806         }
807
808
809         /* disable recovery mode */
810         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
811         if (ret!=0) {
812                 DEBUG(0, (__location__ " Unable to set recovery mode to normal on cluster\n"));
813                 return -1;
814         }
815
816         /* send a message to all clients telling them that the cluster 
817            has been reconfigured */
818         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, CTDB_SRVID_RECONFIGURE, tdb_null);
819
820         DEBUG(0, (__location__ " Recovery complete\n"));
821
822         /* We just finished a recovery successfully. 
823            We now wait for rerecovery_timeout before we allow 
824            another recovery to take place.
825         */
826         DEBUG(0, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
827         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
828         DEBUG(0, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
829
830         return 0;
831 }
832
833
834 /*
835   elections are won by first checking the number of connected nodes, then
836   the priority time, then the vnn
837  */
838 struct election_message {
839         uint32_t num_connected;
840         struct timeval priority_time;
841         uint32_t vnn;
842 };
843
844 /*
845   form this nodes election data
846  */
847 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
848 {
849         int ret, i;
850         struct ctdb_node_map *nodemap;
851         struct ctdb_context *ctdb = rec->ctdb;
852
853         ZERO_STRUCTP(em);
854
855         em->vnn = rec->ctdb->vnn;
856         em->priority_time = rec->priority_time;
857
858         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
859         if (ret != 0) {
860                 return;
861         }
862
863         for (i=0;i<nodemap->num;i++) {
864                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
865                         em->num_connected++;
866                 }
867         }
868         talloc_free(nodemap);
869 }
870
871 /*
872   see if the given election data wins
873  */
874 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
875 {
876         struct election_message myem;
877         int cmp;
878
879         ctdb_election_data(rec, &myem);
880
881         /* try to use the most connected node */
882         cmp = (int)myem.num_connected - (int)em->num_connected;
883
884         /* then the longest running node */
885         if (cmp == 0) {
886                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
887         }
888
889         if (cmp == 0) {
890                 cmp = (int)myem.vnn - (int)em->vnn;
891         }
892
893         return cmp > 0;
894 }
895
896 /*
897   send out an election request
898  */
899 static int send_election_request(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t vnn)
900 {
901         int ret;
902         TDB_DATA election_data;
903         struct election_message emsg;
904         uint64_t srvid;
905         struct ctdb_context *ctdb = rec->ctdb;
906         
907         srvid = CTDB_SRVID_RECOVERY;
908
909         ctdb_election_data(rec, &emsg);
910
911         election_data.dsize = sizeof(struct election_message);
912         election_data.dptr  = (unsigned char *)&emsg;
913
914
915         /* first we assume we will win the election and set 
916            recoverymaster to be ourself on the current node
917          */
918         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), vnn, vnn);
919         if (ret != 0) {
920                 DEBUG(0, (__location__ " failed to send recmaster election request\n"));
921                 return -1;
922         }
923
924
925         /* send an election message to all active nodes */
926         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
927
928         return 0;
929 }
930
931 /*
932   this function will unban all nodes in the cluster
933 */
934 static void unban_all_nodes(struct ctdb_context *ctdb)
935 {
936         int ret, i;
937         struct ctdb_node_map *nodemap;
938         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
939         
940         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
941         if (ret != 0) {
942                 DEBUG(0,(__location__ " failed to get nodemap to unban all nodes\n"));
943                 return;
944         }
945
946         for (i=0;i<nodemap->num;i++) {
947                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
948                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
949                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].vnn, 0, NODE_FLAGS_BANNED);
950                 }
951         }
952
953         talloc_free(tmp_ctx);
954 }
955
956 /*
957   handler for recovery master elections
958 */
959 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
960                              TDB_DATA data, void *private_data)
961 {
962         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
963         int ret;
964         struct election_message *em = (struct election_message *)data.dptr;
965         TALLOC_CTX *mem_ctx;
966
967         mem_ctx = talloc_new(ctdb);
968
969         /* someone called an election. check their election data
970            and if we disagree and we would rather be the elected node, 
971            send a new election message to all other nodes
972          */
973         if (ctdb_election_win(rec, em)) {
974                 ret = send_election_request(rec, mem_ctx, ctdb_get_vnn(ctdb));
975                 if (ret!=0) {
976                         DEBUG(0, (__location__ " failed to initiate recmaster election"));
977                 }
978                 talloc_free(mem_ctx);
979                 /*unban_all_nodes(ctdb);*/
980                 return;
981         }
982
983         /* release the recmaster lock */
984         if (em->vnn != ctdb->vnn &&
985             ctdb->recovery_lock_fd != -1) {
986                 close(ctdb->recovery_lock_fd);
987                 ctdb->recovery_lock_fd = -1;
988                 unban_all_nodes(ctdb);
989         }
990
991         /* ok, let that guy become recmaster then */
992         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_vnn(ctdb), em->vnn);
993         if (ret != 0) {
994                 DEBUG(0, (__location__ " failed to send recmaster election request"));
995                 talloc_free(mem_ctx);
996                 return;
997         }
998
999         /* release any bans */
1000         rec->last_culprit = (uint32_t)-1;
1001         talloc_free(rec->banned_nodes);
1002         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1003         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1004
1005         talloc_free(mem_ctx);
1006         return;
1007 }
1008
1009
1010 /*
1011   force the start of the election process
1012  */
1013 static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t vnn, 
1014                            struct ctdb_node_map *nodemap)
1015 {
1016         int ret;
1017         struct ctdb_context *ctdb = rec->ctdb;
1018
1019         /* set all nodes to recovery mode to stop all internode traffic */
1020         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1021         if (ret!=0) {
1022                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
1023                 return;
1024         }
1025         
1026         ret = send_election_request(rec, mem_ctx, vnn);
1027         if (ret!=0) {
1028                 DEBUG(0, (__location__ " failed to initiate recmaster election"));
1029                 return;
1030         }
1031
1032         /* wait for a few seconds to collect all responses */
1033         ctdb_wait_timeout(ctdb, ctdb->tunable.election_timeout);
1034 }
1035
1036
1037
1038 /*
1039   handler for when a node changes its flags
1040 */
1041 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1042                             TDB_DATA data, void *private_data)
1043 {
1044         int ret;
1045         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1046         struct ctdb_node_map *nodemap=NULL;
1047         TALLOC_CTX *tmp_ctx;
1048         int i;
1049
1050         if (data.dsize != sizeof(*c)) {
1051                 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1052                 return;
1053         }
1054
1055         tmp_ctx = talloc_new(ctdb);
1056         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1057
1058         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1059
1060         for (i=0;i<nodemap->num;i++) {
1061                 if (nodemap->nodes[i].vnn == c->vnn) break;
1062         }
1063
1064         if (i == nodemap->num) {
1065                 DEBUG(0,(__location__ "Flag change for non-existant node %u\n", c->vnn));
1066                 talloc_free(tmp_ctx);
1067                 return;
1068         }
1069
1070         /* Dont let messages from remote nodes change the DISCONNECTED flag. 
1071            This flag is handled locally based on whether the local node
1072            can communicate with the node or not.
1073         */
1074         c->flags &= ~NODE_FLAGS_DISCONNECTED;
1075         if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) {
1076                 c->flags |= NODE_FLAGS_DISCONNECTED;
1077         }
1078
1079         if (nodemap->nodes[i].flags != c->flags) {
1080                 DEBUG(0,("Node %u has changed flags - now 0x%x\n", c->vnn, c->flags));
1081         }
1082
1083         nodemap->nodes[i].flags = c->flags;
1084
1085         ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), 
1086                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1087
1088         if (ret == 0) {
1089                 ret = ctdb_ctrl_getrecmode(ctdb, CONTROL_TIMEOUT(), 
1090                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1091         }
1092         
1093         if (ret == 0 &&
1094             ctdb->recovery_master == ctdb->vnn &&
1095             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
1096             ctdb->takeover.enabled) {
1097                 ret = ctdb_takeover_run(ctdb, nodemap);
1098                 if (ret != 0) {
1099                         DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
1100                 }
1101         }
1102
1103         talloc_free(tmp_ctx);
1104 }
1105
1106
1107
1108 /*
1109   the main monitoring loop
1110  */
1111 static void monitor_cluster(struct ctdb_context *ctdb)
1112 {
1113         uint32_t vnn, num_active, recmode, recmaster;
1114         TALLOC_CTX *mem_ctx=NULL;
1115         struct ctdb_node_map *nodemap=NULL;
1116         struct ctdb_node_map *remote_nodemap=NULL;
1117         struct ctdb_vnn_map *vnnmap=NULL;
1118         struct ctdb_vnn_map *remote_vnnmap=NULL;
1119         int i, j, ret;
1120         bool need_takeover_run;
1121         struct ctdb_recoverd *rec;
1122
1123         rec = talloc_zero(ctdb, struct ctdb_recoverd);
1124         CTDB_NO_MEMORY_FATAL(ctdb, rec);
1125
1126         rec->ctdb = ctdb;
1127         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1128         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1129
1130         rec->priority_time = timeval_current();
1131
1132         /* register a message port for recovery elections */
1133         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
1134
1135         /* and one for when nodes are disabled/enabled */
1136         ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
1137
1138         /* and one for when nodes are banned */
1139         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
1140
1141         /* and one for when nodes are unbanned */
1142         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
1143         
1144 again:
1145         need_takeover_run = false;
1146
1147         if (mem_ctx) {
1148                 talloc_free(mem_ctx);
1149                 mem_ctx = NULL;
1150         }
1151         mem_ctx = talloc_new(ctdb);
1152         if (!mem_ctx) {
1153                 DEBUG(0,("Failed to create temporary context\n"));
1154                 exit(-1);
1155         }
1156
1157         /* we only check for recovery once every second */
1158         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
1159
1160         /* get relevant tunables */
1161         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
1162         if (ret != 0) {
1163                 DEBUG(0,("Failed to get tunables - retrying\n"));
1164                 goto again;
1165         }
1166
1167         vnn = ctdb_ctrl_getvnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1168         if (vnn == (uint32_t)-1) {
1169                 DEBUG(0,("Failed to get local vnn - retrying\n"));
1170                 goto again;
1171         }
1172
1173         /* get the vnnmap */
1174         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, &vnnmap);
1175         if (ret != 0) {
1176                 DEBUG(0, (__location__ " Unable to get vnnmap from node %u\n", vnn));
1177                 goto again;
1178         }
1179
1180
1181         /* get number of nodes */
1182         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, &nodemap);
1183         if (ret != 0) {
1184                 DEBUG(0, (__location__ " Unable to get nodemap from node %u\n", vnn));
1185                 goto again;
1186         }
1187
1188
1189         /* count how many active nodes there are */
1190         num_active = 0;
1191         for (i=0; i<nodemap->num; i++) {
1192                 if (rec->banned_nodes[nodemap->nodes[i].vnn] != NULL) {
1193                         nodemap->nodes[i].flags |= NODE_FLAGS_BANNED;
1194                 } else {
1195                         nodemap->nodes[i].flags &= ~NODE_FLAGS_BANNED;
1196                 }
1197                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1198                         num_active++;
1199                 }
1200         }
1201
1202
1203         /* check which node is the recovery master */
1204         ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), vnn, &recmaster);
1205         if (ret != 0) {
1206                 DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", vnn));
1207                 goto again;
1208         }
1209
1210         if (recmaster == (uint32_t)-1) {
1211                 DEBUG(0,(__location__ " Initial recovery master set - forcing election\n"));
1212                 force_election(rec, mem_ctx, vnn, nodemap);
1213                 goto again;
1214         }
1215         
1216         /* verify that the recmaster node is still active */
1217         for (j=0; j<nodemap->num; j++) {
1218                 if (nodemap->nodes[j].vnn==recmaster) {
1219                         break;
1220                 }
1221         }
1222
1223         if (j == nodemap->num) {
1224                 DEBUG(0, ("Recmaster node %u not in list. Force reelection\n", recmaster));
1225                 force_election(rec, mem_ctx, vnn, nodemap);
1226                 goto again;
1227         }
1228
1229         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1230                 DEBUG(0, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].vnn));
1231                 force_election(rec, mem_ctx, vnn, nodemap);
1232                 goto again;
1233         }
1234         
1235
1236         /* if we are not the recmaster then we do not need to check
1237            if recovery is needed
1238          */
1239         if (vnn!=recmaster) {
1240                 goto again;
1241         }
1242
1243
1244         /* verify that all active nodes agree that we are the recmaster */
1245         for (j=0; j<nodemap->num; j++) {
1246                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1247                         continue;
1248                 }
1249                 if (nodemap->nodes[j].vnn == vnn) {
1250                         continue;
1251                 }
1252
1253                 ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, &recmaster);
1254                 if (ret != 0) {
1255                         DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", vnn));
1256                         goto again;
1257                 }
1258
1259                 if (recmaster!=vnn) {
1260                         DEBUG(0, ("Node %u does not agree we are the recmaster. Force reelection\n", 
1261                                   nodemap->nodes[j].vnn));
1262                         force_election(rec, mem_ctx, vnn, nodemap);
1263                         goto again;
1264                 }
1265         }
1266
1267
1268         /* verify that all active nodes are in normal mode 
1269            and not in recovery mode 
1270          */
1271         for (j=0; j<nodemap->num; j++) {
1272                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1273                         continue;
1274                 }
1275
1276                 ret = ctdb_ctrl_getrecmode(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, &recmode);
1277                 if (ret != 0) {
1278                         DEBUG(0, ("Unable to get recmode from node %u\n", vnn));
1279                         goto again;
1280                 }
1281                 if (recmode != CTDB_RECOVERY_NORMAL) {
1282                         DEBUG(0, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", 
1283                                   nodemap->nodes[j].vnn));
1284                         do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
1285                         goto again;
1286                 }
1287         }
1288
1289
1290         /* get the nodemap for all active remote nodes and verify
1291            they are the same as for this node
1292          */
1293         for (j=0; j<nodemap->num; j++) {
1294                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1295                         continue;
1296                 }
1297                 if (nodemap->nodes[j].vnn == vnn) {
1298                         continue;
1299                 }
1300
1301                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, 
1302                                            mem_ctx, &remote_nodemap);
1303                 if (ret != 0) {
1304                         DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
1305                                   nodemap->nodes[j].vnn));
1306                         goto again;
1307                 }
1308
1309                 /* if the nodes disagree on how many nodes there are
1310                    then this is a good reason to try recovery
1311                  */
1312                 if (remote_nodemap->num != nodemap->num) {
1313                         DEBUG(0, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
1314                                   nodemap->nodes[j].vnn, remote_nodemap->num, nodemap->num));
1315                         do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
1316                         goto again;
1317                 }
1318
1319                 /* if the nodes disagree on which nodes exist and are
1320                    active, then that is also a good reason to do recovery
1321                  */
1322                 for (i=0;i<nodemap->num;i++) {
1323                         if (remote_nodemap->nodes[i].vnn != nodemap->nodes[i].vnn) {
1324                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap vnn for %d (%u vs %u).\n", 
1325                                           nodemap->nodes[j].vnn, i, 
1326                                           remote_nodemap->nodes[i].vnn, nodemap->nodes[i].vnn));
1327                                 do_recovery(rec, mem_ctx, vnn, num_active, nodemap, 
1328                                             vnnmap, nodemap->nodes[j].vnn);
1329                                 goto again;
1330                         }
1331                         if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
1332                             (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1333                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
1334                                           nodemap->nodes[j].vnn, i,
1335                                           remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
1336                                 do_recovery(rec, mem_ctx, vnn, num_active, nodemap, 
1337                                             vnnmap, nodemap->nodes[j].vnn);
1338                                 goto again;
1339                         }
1340                 }
1341
1342                 /* update our nodemap flags according to the other
1343                    server - this gets the NODE_FLAGS_DISABLED
1344                    flag. Note that the remote node is authoritative
1345                    for its flags (except CONNECTED, which we know
1346                    matches in this code) */
1347                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1348                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1349                         need_takeover_run = true;
1350                 }
1351         }
1352
1353
1354         /* there better be the same number of lmasters in the vnn map
1355            as there are active nodes or we will have to do a recovery
1356          */
1357         if (vnnmap->size != num_active) {
1358                 DEBUG(0, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
1359                           vnnmap->size, num_active));
1360                 do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, ctdb->vnn);
1361                 goto again;
1362         }
1363
1364         /* verify that all active nodes in the nodemap also exist in 
1365            the vnnmap.
1366          */
1367         for (j=0; j<nodemap->num; j++) {
1368                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1369                         continue;
1370                 }
1371                 if (nodemap->nodes[j].vnn == vnn) {
1372                         continue;
1373                 }
1374
1375                 for (i=0; i<vnnmap->size; i++) {
1376                         if (vnnmap->map[i] == nodemap->nodes[j].vnn) {
1377                                 break;
1378                         }
1379                 }
1380                 if (i == vnnmap->size) {
1381                         DEBUG(0, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
1382                                   nodemap->nodes[j].vnn));
1383                         do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
1384                         goto again;
1385                 }
1386         }
1387
1388         
1389         /* verify that all other nodes have the same vnnmap
1390            and are from the same generation
1391          */
1392         for (j=0; j<nodemap->num; j++) {
1393                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1394                         continue;
1395                 }
1396                 if (nodemap->nodes[j].vnn == vnn) {
1397                         continue;
1398                 }
1399
1400                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, 
1401                                           mem_ctx, &remote_vnnmap);
1402                 if (ret != 0) {
1403                         DEBUG(0, (__location__ " Unable to get vnnmap from remote node %u\n", 
1404                                   nodemap->nodes[j].vnn));
1405                         goto again;
1406                 }
1407
1408                 /* verify the vnnmap generation is the same */
1409                 if (vnnmap->generation != remote_vnnmap->generation) {
1410                         DEBUG(0, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
1411                                   nodemap->nodes[j].vnn, remote_vnnmap->generation, vnnmap->generation));
1412                         do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
1413                         goto again;
1414                 }
1415
1416                 /* verify the vnnmap size is the same */
1417                 if (vnnmap->size != remote_vnnmap->size) {
1418                         DEBUG(0, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
1419                                   nodemap->nodes[j].vnn, remote_vnnmap->size, vnnmap->size));
1420                         do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
1421                         goto again;
1422                 }
1423
1424                 /* verify the vnnmap is the same */
1425                 for (i=0;i<vnnmap->size;i++) {
1426                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
1427                                 DEBUG(0, (__location__ " Remote node %u has different vnnmap.\n", 
1428                                           nodemap->nodes[j].vnn));
1429                                 do_recovery(rec, mem_ctx, vnn, num_active, nodemap, 
1430                                             vnnmap, nodemap->nodes[j].vnn);
1431                                 goto again;
1432                         }
1433                 }
1434         }
1435
1436         /* we might need to change who has what IP assigned */
1437         if (need_takeover_run && ctdb->takeover.enabled) {
1438                 ret = ctdb_takeover_run(ctdb, nodemap);
1439                 if (ret != 0) {
1440                         DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
1441                 }
1442         }
1443
1444         goto again;
1445
1446 }
1447
1448 /*
1449   event handler for when the main ctdbd dies
1450  */
1451 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
1452                                  uint16_t flags, void *private_data)
1453 {
1454         DEBUG(0,("recovery daemon parent died - exiting\n"));
1455         _exit(1);
1456 }
1457
1458
1459
1460 /*
1461   startup the recovery daemon as a child of the main ctdb daemon
1462  */
1463 int ctdb_start_recoverd(struct ctdb_context *ctdb)
1464 {
1465         int ret;
1466         int fd[2];
1467         pid_t child;
1468
1469         if (pipe(fd) != 0) {
1470                 return -1;
1471         }
1472
1473         child = fork();
1474         if (child == -1) {
1475                 return -1;
1476         }
1477         
1478         if (child != 0) {
1479                 close(fd[0]);
1480                 return 0;
1481         }
1482
1483         close(fd[1]);
1484
1485         /* shutdown the transport */
1486         ctdb->methods->shutdown(ctdb);
1487
1488         /* get a new event context */
1489         talloc_free(ctdb->ev);
1490         ctdb->ev = event_context_init(ctdb);
1491
1492         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
1493                      ctdb_recoverd_parent, &fd[0]);     
1494
1495         close(ctdb->daemon.sd);
1496         ctdb->daemon.sd = -1;
1497
1498         srandom(getpid() ^ time(NULL));
1499
1500         /* initialise ctdb */
1501         ret = ctdb_socket_connect(ctdb);
1502         if (ret != 0) {
1503                 DEBUG(0, (__location__ " Failed to init ctdb\n"));
1504                 exit(1);
1505         }
1506
1507         monitor_cluster(ctdb);
1508
1509         DEBUG(0,("ERROR: ctdb_recoverd finished!?\n"));
1510         return -1;
1511 }