break checking that the recoverymode on all nodes are ok out into its
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "popt.h"
25 #include "cmdline.h"
26 #include "../include/ctdb.h"
27 #include "../include/ctdb_private.h"
28
29
30 struct ban_state {
31         struct ctdb_recoverd *rec;
32         uint32_t banned_node;
33 };
34
35 /*
36   private state of recovery daemon
37  */
38 struct ctdb_recoverd {
39         struct ctdb_context *ctdb;
40         uint32_t last_culprit;
41         uint32_t culprit_counter;
42         struct timeval first_recover_time;
43         struct ban_state **banned_nodes;
44         struct timeval priority_time;
45 };
46
47 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
48 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
49
50 /*
51   unban a node
52  */
53 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t vnn)
54 {
55         struct ctdb_context *ctdb = rec->ctdb;
56
57         if (!ctdb_validate_vnn(ctdb, vnn)) {
58                 DEBUG(0,("Bad vnn %u in ctdb_ban_node\n", vnn));
59                 return;
60         }
61
62         if (rec->banned_nodes[vnn] == NULL) {
63                 return;
64         }
65
66         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), vnn, 0, NODE_FLAGS_BANNED);
67
68         talloc_free(rec->banned_nodes[vnn]);
69         rec->banned_nodes[vnn] = NULL;
70 }
71
72
73 /*
74   called when a ban has timed out
75  */
76 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
77 {
78         struct ban_state *state = talloc_get_type(p, struct ban_state);
79         struct ctdb_recoverd *rec = state->rec;
80         uint32_t vnn = state->banned_node;
81
82         DEBUG(0,("Node %u is now unbanned\n", vnn));
83         ctdb_unban_node(rec, vnn);
84 }
85
86 /*
87   ban a node for a period of time
88  */
89 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t vnn, uint32_t ban_time)
90 {
91         struct ctdb_context *ctdb = rec->ctdb;
92
93         if (!ctdb_validate_vnn(ctdb, vnn)) {
94                 DEBUG(0,("Bad vnn %u in ctdb_ban_node\n", vnn));
95                 return;
96         }
97
98         if (vnn == ctdb->vnn) {
99                 DEBUG(0,("self ban - lowering our election priority\n"));
100                 /* banning ourselves - lower our election priority */
101                 rec->priority_time = timeval_current();
102         }
103
104         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), vnn, NODE_FLAGS_BANNED, 0);
105
106         rec->banned_nodes[vnn] = talloc(rec, struct ban_state);
107         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[vnn]);
108
109         rec->banned_nodes[vnn]->rec = rec;
110         rec->banned_nodes[vnn]->banned_node = vnn;
111
112         if (ban_time != 0) {
113                 event_add_timed(ctdb->ev, rec->banned_nodes[vnn], 
114                                 timeval_current_ofs(ban_time, 0),
115                                 ctdb_ban_timeout, rec->banned_nodes[vnn]);
116         }
117 }
118
119
120 /*
121   change recovery mode on all nodes
122  */
123 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
124 {
125         int j, ret;
126
127         /* start the freeze process immediately on all nodes */
128         ctdb_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, 
129                      CTDB_CONTROL_FREEZE, CTDB_CTRL_FLAG_NOREPLY, tdb_null, 
130                      NULL, NULL, NULL, NULL, NULL);
131
132         /* set recovery mode to active on all nodes */
133         for (j=0; j<nodemap->num; j++) {
134                 /* dont change it for nodes that are unavailable */
135                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
136                         continue;
137                 }
138
139                 if (rec_mode == CTDB_RECOVERY_ACTIVE) {
140                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn);
141                         if (ret != 0) {
142                                 DEBUG(0, (__location__ " Unable to freeze node %u\n", nodemap->nodes[j].vnn));
143                                 return -1;
144                         }
145                 }
146
147                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, rec_mode);
148                 if (ret != 0) {
149                         DEBUG(0, (__location__ " Unable to set recmode on node %u\n", nodemap->nodes[j].vnn));
150                         return -1;
151                 }
152
153                 if (rec_mode == CTDB_RECOVERY_NORMAL) {
154                         ret = ctdb_ctrl_thaw(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn);
155                         if (ret != 0) {
156                                 DEBUG(0, (__location__ " Unable to thaw node %u\n", nodemap->nodes[j].vnn));
157                                 return -1;
158                         }
159                 }
160         }
161
162         return 0;
163 }
164
165 /*
166   change recovery master on all node
167  */
168 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t vnn)
169 {
170         int j, ret;
171
172         /* set recovery master to vnn on all nodes */
173         for (j=0; j<nodemap->num; j++) {
174                 /* dont change it for nodes that are unavailable */
175                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
176                         continue;
177                 }
178
179                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, vnn);
180                 if (ret != 0) {
181                         DEBUG(0, (__location__ " Unable to set recmaster on node %u\n", nodemap->nodes[j].vnn));
182                         return -1;
183                 }
184         }
185
186         return 0;
187 }
188
189
190 /*
191   ensure all other nodes have attached to any databases that we have
192  */
193 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
194                                            uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
195 {
196         int i, j, db, ret;
197         struct ctdb_dbid_map *remote_dbmap;
198
199         /* verify that all other nodes have all our databases */
200         for (j=0; j<nodemap->num; j++) {
201                 /* we dont need to ourself ourselves */
202                 if (nodemap->nodes[j].vnn == vnn) {
203                         continue;
204                 }
205                 /* dont check nodes that are unavailable */
206                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
207                         continue;
208                 }
209
210                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, 
211                                          mem_ctx, &remote_dbmap);
212                 if (ret != 0) {
213                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", vnn));
214                         return -1;
215                 }
216
217                 /* step through all local databases */
218                 for (db=0; db<dbmap->num;db++) {
219                         const char *name;
220
221
222                         for (i=0;i<remote_dbmap->num;i++) {
223                                 if (dbmap->dbids[db] == remote_dbmap->dbids[i]) {
224                                         break;
225                                 }
226                         }
227                         /* the remote node already have this database */
228                         if (i!=remote_dbmap->num) {
229                                 continue;
230                         }
231                         /* ok so we need to create this database */
232                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), vnn, dbmap->dbids[db], mem_ctx, &name);
233                         if (ret != 0) {
234                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", vnn));
235                                 return -1;
236                         }
237                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, mem_ctx, name);
238                         if (ret != 0) {
239                                 DEBUG(0, (__location__ " Unable to create remote db:%s\n", name));
240                                 return -1;
241                         }
242                 }
243         }
244
245         return 0;
246 }
247
248
249 /*
250   ensure we are attached to any databases that anyone else is attached to
251  */
252 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
253                                           uint32_t vnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
254 {
255         int i, j, db, ret;
256         struct ctdb_dbid_map *remote_dbmap;
257
258         /* verify that we have all database any other node has */
259         for (j=0; j<nodemap->num; j++) {
260                 /* we dont need to ourself ourselves */
261                 if (nodemap->nodes[j].vnn == vnn) {
262                         continue;
263                 }
264                 /* dont check nodes that are unavailable */
265                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
266                         continue;
267                 }
268
269                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, 
270                                          mem_ctx, &remote_dbmap);
271                 if (ret != 0) {
272                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", vnn));
273                         return -1;
274                 }
275
276                 /* step through all databases on the remote node */
277                 for (db=0; db<remote_dbmap->num;db++) {
278                         const char *name;
279
280                         for (i=0;i<(*dbmap)->num;i++) {
281                                 if (remote_dbmap->dbids[db] == (*dbmap)->dbids[i]) {
282                                         break;
283                                 }
284                         }
285                         /* we already have this db locally */
286                         if (i!=(*dbmap)->num) {
287                                 continue;
288                         }
289                         /* ok so we need to create this database and
290                            rebuild dbmap
291                          */
292                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, 
293                                             remote_dbmap->dbids[db], mem_ctx, &name);
294                         if (ret != 0) {
295                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", 
296                                           nodemap->nodes[j].vnn));
297                                 return -1;
298                         }
299                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, name);
300                         if (ret != 0) {
301                                 DEBUG(0, (__location__ " Unable to create local db:%s\n", name));
302                                 return -1;
303                         }
304                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, dbmap);
305                         if (ret != 0) {
306                                 DEBUG(0, (__location__ " Unable to reread dbmap on node %u\n", vnn));
307                                 return -1;
308                         }
309                 }
310         }
311
312         return 0;
313 }
314
315
316 /*
317   pull all the remote database contents into ours
318  */
319 static int pull_all_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
320                                      uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
321 {
322         int i, j, ret;
323
324         /* pull all records from all other nodes across onto this node
325            (this merges based on rsn)
326         */
327         for (i=0;i<dbmap->num;i++) {
328                 for (j=0; j<nodemap->num; j++) {
329                         /* we dont need to merge with ourselves */
330                         if (nodemap->nodes[j].vnn == vnn) {
331                                 continue;
332                         }
333                         /* dont merge from nodes that are unavailable */
334                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
335                                 continue;
336                         }
337                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, 
338                                                vnn, dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
339                         if (ret != 0) {
340                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
341                                           nodemap->nodes[j].vnn, vnn));
342                                 return -1;
343                         }
344                 }
345         }
346
347         return 0;
348 }
349
350
351 /*
352   change the dmaster on all databases to point to us
353  */
354 static int update_dmaster_on_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
355                                            uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
356 {
357         int i, j, ret;
358
359         /* update dmaster to point to this node for all databases/nodes */
360         for (i=0;i<dbmap->num;i++) {
361                 for (j=0; j<nodemap->num; j++) {
362                         /* dont repoint nodes that are unavailable */
363                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
364                                 continue;
365                         }
366                         ret = ctdb_ctrl_setdmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, ctdb, dbmap->dbids[i], vnn);
367                         if (ret != 0) {
368                                 DEBUG(0, (__location__ " Unable to set dmaster for node %u db:0x%08x\n", nodemap->nodes[j].vnn, dbmap->dbids[i]));
369                                 return -1;
370                         }
371                 }
372         }
373
374         return 0;
375 }
376
377
378 /*
379   update flags on all active nodes
380  */
381 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
382 {
383         int i;
384         for (i=0;i<nodemap->num;i++) {
385                 struct ctdb_node_flag_change c;
386                 TDB_DATA data;
387
388                 c.vnn = nodemap->nodes[i].vnn;
389                 c.old_flags = nodemap->nodes[i].flags;
390                 c.new_flags = nodemap->nodes[i].flags;
391
392                 data.dptr = (uint8_t *)&c;
393                 data.dsize = sizeof(c);
394
395                 ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
396                                   CTDB_SRVID_NODE_FLAGS_CHANGED, data);
397
398         }
399         return 0;
400 }
401
402 /*
403   vacuum one database
404  */
405 static int vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *nodemap)
406 {
407         uint64_t max_rsn;
408         int ret, i;
409
410         /* find max rsn on our local node for this db */
411         ret = ctdb_ctrl_get_max_rsn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, db_id, &max_rsn);
412         if (ret != 0) {
413                 return -1;
414         }
415
416         /* set rsn on non-empty records to max_rsn+1 */
417         for (i=0;i<nodemap->num;i++) {
418                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
419                         continue;
420                 }
421                 ret = ctdb_ctrl_set_rsn_nonempty(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].vnn,
422                                                  db_id, max_rsn+1);
423                 if (ret != 0) {
424                         DEBUG(0,(__location__ " Failed to set rsn on node %u to %llu\n",
425                                  nodemap->nodes[i].vnn, (unsigned long long)max_rsn+1));
426                         return -1;
427                 }
428         }
429
430         /* delete records with rsn < max_rsn+1 on all nodes */
431         for (i=0;i<nodemap->num;i++) {
432                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
433                         continue;
434                 }
435                 ret = ctdb_ctrl_delete_low_rsn(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].vnn,
436                                                  db_id, max_rsn+1);
437                 if (ret != 0) {
438                         DEBUG(0,(__location__ " Failed to delete records on node %u with rsn below %llu\n",
439                                  nodemap->nodes[i].vnn, (unsigned long long)max_rsn+1));
440                         return -1;
441                 }
442         }
443
444
445         return 0;
446 }
447
448
449 /*
450   vacuum all attached databases
451  */
452 static int vacuum_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
453                                 struct ctdb_dbid_map *dbmap)
454 {
455         int i;
456
457         /* update dmaster to point to this node for all databases/nodes */
458         for (i=0;i<dbmap->num;i++) {
459                 if (vacuum_db(ctdb, dbmap->dbids[i], nodemap) != 0) {
460                         return -1;
461                 }
462         }
463         return 0;
464 }
465
466
467 /*
468   push out all our database contents to all other nodes
469  */
470 static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
471                                     uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
472 {
473         int i, j, ret;
474
475         /* push all records out to the nodes again */
476         for (i=0;i<dbmap->num;i++) {
477                 for (j=0; j<nodemap->num; j++) {
478                         /* we dont need to push to ourselves */
479                         if (nodemap->nodes[j].vnn == vnn) {
480                                 continue;
481                         }
482                         /* dont push to nodes that are unavailable */
483                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
484                                 continue;
485                         }
486                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), vnn, nodemap->nodes[j].vnn, 
487                                                dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
488                         if (ret != 0) {
489                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
490                                           vnn, nodemap->nodes[j].vnn));
491                                 return -1;
492                         }
493                 }
494         }
495
496         return 0;
497 }
498
499
500 /*
501   ensure all nodes have the same vnnmap we do
502  */
503 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
504                                       uint32_t vnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
505 {
506         int j, ret;
507
508         /* push the new vnn map out to all the nodes */
509         for (j=0; j<nodemap->num; j++) {
510                 /* dont push to nodes that are unavailable */
511                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
512                         continue;
513                 }
514
515                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, mem_ctx, vnnmap);
516                 if (ret != 0) {
517                         DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", vnn));
518                         return -1;
519                 }
520         }
521
522         return 0;
523 }
524
525
526 /*
527   handler for when the admin bans a node
528 */
529 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
530                         TDB_DATA data, void *private_data)
531 {
532         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
533         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
534         uint32_t recmaster;
535         int ret;
536
537         if (data.dsize != sizeof(*b)) {
538                 DEBUG(0,("Bad data in ban_handler\n"));
539                 return;
540         }
541
542         ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
543         if (ret != 0) {
544                 DEBUG(0,(__location__ " Failed to find the recmaster\n"));
545                 return;
546         }
547
548         if (recmaster != ctdb->vnn) {
549                 DEBUG(0,("We are not the recmaster - ignoring ban request\n"));
550                 return;
551         }
552
553         DEBUG(0,("Node %u has been banned for %u seconds by the administrator\n", 
554                  b->vnn, b->ban_time));
555         ctdb_ban_node(rec, b->vnn, b->ban_time);
556 }
557
558 /*
559   handler for when the admin unbans a node
560 */
561 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
562                           TDB_DATA data, void *private_data)
563 {
564         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
565         uint32_t vnn;
566         int ret;
567         uint32_t recmaster;
568
569         if (data.dsize != sizeof(uint32_t)) {
570                 DEBUG(0,("Bad data in unban_handler\n"));
571                 return;
572         }
573         vnn = *(uint32_t *)data.dptr;
574
575         ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
576         if (ret != 0) {
577                 DEBUG(0,(__location__ " Failed to find the recmaster\n"));
578                 return;
579         }
580
581         if (recmaster != ctdb->vnn) {
582                 DEBUG(0,("We are not the recmaster - ignoring unban request\n"));
583                 return;
584         }
585
586         DEBUG(0,("Node %u has been unbanned by the administrator\n", vnn));
587         ctdb_unban_node(rec, vnn);
588 }
589
590
591
592 /*
593   called when ctdb_wait_timeout should finish
594  */
595 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
596                               struct timeval yt, void *p)
597 {
598         uint32_t *timed_out = (uint32_t *)p;
599         (*timed_out) = 1;
600 }
601
602 /*
603   wait for a given number of seconds
604  */
605 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
606 {
607         uint32_t timed_out = 0;
608         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
609         while (!timed_out) {
610                 event_loop_once(ctdb->ev);
611         }
612 }
613
614 /* Create a new random generation ip. 
615    The generation id can not be the INVALID_GENERATION id
616 */
617 static uint32_t new_generation(void)
618 {
619         uint32_t generation;
620
621         while (1) {
622                 generation = random();
623
624                 if (generation != INVALID_GENERATION) {
625                         break;
626                 }
627         }
628
629         return generation;
630 }
631                 
632 /*
633   we are the recmaster, and recovery is needed - start a recovery run
634  */
635 static int do_recovery(struct ctdb_recoverd *rec, 
636                        TALLOC_CTX *mem_ctx, uint32_t vnn, uint32_t num_active,
637                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
638                        uint32_t culprit)
639 {
640         struct ctdb_context *ctdb = rec->ctdb;
641         int i, j, ret;
642         uint32_t generation;
643         struct ctdb_dbid_map *dbmap;
644
645         if (rec->last_culprit != culprit ||
646             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
647                 /* either a new node is the culprit, or we've decide to forgive them */
648                 rec->last_culprit = culprit;
649                 rec->first_recover_time = timeval_current();
650                 rec->culprit_counter = 0;
651         }
652         rec->culprit_counter++;
653
654         if (rec->culprit_counter > 2*nodemap->num) {
655                 DEBUG(0,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
656                          culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
657                          ctdb->tunable.recovery_ban_period));
658                 ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
659         }
660
661         if (!ctdb_recovery_lock(ctdb, true)) {
662                 DEBUG(0,("Unable to get recovery lock - aborting recovery\n"));
663                 return -1;
664         }
665
666         /* set recovery mode to active on all nodes */
667         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
668         if (ret!=0) {
669                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
670                 return -1;
671         }
672
673         DEBUG(0, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
674
675         /* pick a new generation number */
676         generation = new_generation();
677
678         /* change the vnnmap on this node to use the new generation 
679            number but not on any other nodes.
680            this guarantees that if we abort the recovery prematurely
681            for some reason (a node stops responding?)
682            that we can just return immediately and we will reenter
683            recovery shortly again.
684            I.e. we deliberately leave the cluster with an inconsistent
685            generation id to allow us to abort recovery at any stage and
686            just restart it from scratch.
687          */
688         vnnmap->generation = generation;
689         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, vnnmap);
690         if (ret != 0) {
691                 DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", vnn));
692                 return -1;
693         }
694
695         /* get a list of all databases */
696         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, &dbmap);
697         if (ret != 0) {
698                 DEBUG(0, (__location__ " Unable to get dbids from node :%u\n", vnn));
699                 return -1;
700         }
701
702
703
704         /* verify that all other nodes have all our databases */
705         ret = create_missing_remote_databases(ctdb, nodemap, vnn, dbmap, mem_ctx);
706         if (ret != 0) {
707                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
708                 return -1;
709         }
710
711         /* verify that we have all the databases any other node has */
712         ret = create_missing_local_databases(ctdb, nodemap, vnn, &dbmap, mem_ctx);
713         if (ret != 0) {
714                 DEBUG(0, (__location__ " Unable to create missing local databases\n"));
715                 return -1;
716         }
717
718
719
720         /* verify that all other nodes have all our databases */
721         ret = create_missing_remote_databases(ctdb, nodemap, vnn, dbmap, mem_ctx);
722         if (ret != 0) {
723                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
724                 return -1;
725         }
726
727
728         DEBUG(1, (__location__ " Recovery - created remote databases\n"));
729
730         /* pull all remote databases onto the local node */
731         ret = pull_all_remote_databases(ctdb, nodemap, vnn, dbmap, mem_ctx);
732         if (ret != 0) {
733                 DEBUG(0, (__location__ " Unable to pull remote databases\n"));
734                 return -1;
735         }
736
737         DEBUG(1, (__location__ " Recovery - pulled remote databases\n"));
738
739         /* push all local databases to the remote nodes */
740         ret = push_all_local_databases(ctdb, nodemap, vnn, dbmap, mem_ctx);
741         if (ret != 0) {
742                 DEBUG(0, (__location__ " Unable to push local databases\n"));
743                 return -1;
744         }
745
746         DEBUG(1, (__location__ " Recovery - pushed remote databases\n"));
747
748         /* build a new vnn map with all the currently active and
749            unbanned nodes */
750         generation = new_generation();
751         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
752         CTDB_NO_MEMORY(ctdb, vnnmap);
753         vnnmap->generation = generation;
754         vnnmap->size = num_active;
755         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
756         for (i=j=0;i<nodemap->num;i++) {
757                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
758                         vnnmap->map[j++] = nodemap->nodes[i].vnn;
759                 }
760         }
761
762
763
764         /* update to the new vnnmap on all nodes */
765         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, vnn, vnnmap, mem_ctx);
766         if (ret != 0) {
767                 DEBUG(0, (__location__ " Unable to update vnnmap on all nodes\n"));
768                 return -1;
769         }
770
771         DEBUG(1, (__location__ " Recovery - updated vnnmap\n"));
772
773         /* update recmaster to point to us for all nodes */
774         ret = set_recovery_master(ctdb, nodemap, vnn);
775         if (ret!=0) {
776                 DEBUG(0, (__location__ " Unable to set recovery master\n"));
777                 return -1;
778         }
779
780         DEBUG(1, (__location__ " Recovery - updated recmaster\n"));
781
782         /* repoint all local and remote database records to the local
783            node as being dmaster
784          */
785         ret = update_dmaster_on_all_databases(ctdb, nodemap, vnn, dbmap, mem_ctx);
786         if (ret != 0) {
787                 DEBUG(0, (__location__ " Unable to update dmaster on all databases\n"));
788                 return -1;
789         }
790
791         DEBUG(1, (__location__ " Recovery - updated dmaster on all databases\n"));
792
793         /*
794           update all nodes to have the same flags that we have
795          */
796         ret = update_flags_on_all_nodes(ctdb, nodemap);
797         if (ret != 0) {
798                 DEBUG(0, (__location__ " Unable to update flags on all nodes\n"));
799                 return -1;
800         }
801         
802         DEBUG(1, (__location__ " Recovery - updated flags\n"));
803
804         /*
805           run a vacuum operation on empty records
806          */
807         ret = vacuum_all_databases(ctdb, nodemap, dbmap);
808         if (ret != 0) {
809                 DEBUG(0, (__location__ " Unable to vacuum all databases\n"));
810                 return -1;
811         }
812
813         DEBUG(1, (__location__ " Recovery - vacuumed all databases\n"));
814
815         /*
816           if enabled, tell nodes to takeover their public IPs
817          */
818         if (ctdb->takeover.enabled) {
819                 ret = ctdb_takeover_run(ctdb, nodemap);
820                 if (ret != 0) {
821                         DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
822                         return -1;
823                 }
824                 DEBUG(1, (__location__ " Recovery - done takeover\n"));
825         }
826
827
828         /* disable recovery mode */
829         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
830         if (ret!=0) {
831                 DEBUG(0, (__location__ " Unable to set recovery mode to normal on cluster\n"));
832                 return -1;
833         }
834
835         /* send a message to all clients telling them that the cluster 
836            has been reconfigured */
837         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
838
839         DEBUG(0, (__location__ " Recovery complete\n"));
840
841         /* We just finished a recovery successfully. 
842            We now wait for rerecovery_timeout before we allow 
843            another recovery to take place.
844         */
845         DEBUG(0, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
846         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
847         DEBUG(0, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
848
849         return 0;
850 }
851
852
853 /*
854   elections are won by first checking the number of connected nodes, then
855   the priority time, then the vnn
856  */
857 struct election_message {
858         uint32_t num_connected;
859         struct timeval priority_time;
860         uint32_t vnn;
861 };
862
863 /*
864   form this nodes election data
865  */
866 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
867 {
868         int ret, i;
869         struct ctdb_node_map *nodemap;
870         struct ctdb_context *ctdb = rec->ctdb;
871
872         ZERO_STRUCTP(em);
873
874         em->vnn = rec->ctdb->vnn;
875         em->priority_time = rec->priority_time;
876
877         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
878         if (ret != 0) {
879                 return;
880         }
881
882         for (i=0;i<nodemap->num;i++) {
883                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
884                         em->num_connected++;
885                 }
886         }
887         talloc_free(nodemap);
888 }
889
890 /*
891   see if the given election data wins
892  */
893 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
894 {
895         struct election_message myem;
896         int cmp;
897
898         ctdb_election_data(rec, &myem);
899
900         /* try to use the most connected node */
901         cmp = (int)myem.num_connected - (int)em->num_connected;
902
903         /* then the longest running node */
904         if (cmp == 0) {
905                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
906         }
907
908         if (cmp == 0) {
909                 cmp = (int)myem.vnn - (int)em->vnn;
910         }
911
912         return cmp > 0;
913 }
914
915 /*
916   send out an election request
917  */
918 static int send_election_request(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t vnn)
919 {
920         int ret;
921         TDB_DATA election_data;
922         struct election_message emsg;
923         uint64_t srvid;
924         struct ctdb_context *ctdb = rec->ctdb;
925         
926         srvid = CTDB_SRVID_RECOVERY;
927
928         ctdb_election_data(rec, &emsg);
929
930         election_data.dsize = sizeof(struct election_message);
931         election_data.dptr  = (unsigned char *)&emsg;
932
933
934         /* first we assume we will win the election and set 
935            recoverymaster to be ourself on the current node
936          */
937         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), vnn, vnn);
938         if (ret != 0) {
939                 DEBUG(0, (__location__ " failed to send recmaster election request\n"));
940                 return -1;
941         }
942
943
944         /* send an election message to all active nodes */
945         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
946
947         return 0;
948 }
949
950 /*
951   this function will unban all nodes in the cluster
952 */
953 static void unban_all_nodes(struct ctdb_context *ctdb)
954 {
955         int ret, i;
956         struct ctdb_node_map *nodemap;
957         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
958         
959         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
960         if (ret != 0) {
961                 DEBUG(0,(__location__ " failed to get nodemap to unban all nodes\n"));
962                 return;
963         }
964
965         for (i=0;i<nodemap->num;i++) {
966                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
967                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
968                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].vnn, 0, NODE_FLAGS_BANNED);
969                 }
970         }
971
972         talloc_free(tmp_ctx);
973 }
974
975 /*
976   handler for recovery master elections
977 */
978 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
979                              TDB_DATA data, void *private_data)
980 {
981         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
982         int ret;
983         struct election_message *em = (struct election_message *)data.dptr;
984         TALLOC_CTX *mem_ctx;
985
986         mem_ctx = talloc_new(ctdb);
987
988         /* someone called an election. check their election data
989            and if we disagree and we would rather be the elected node, 
990            send a new election message to all other nodes
991          */
992         if (ctdb_election_win(rec, em)) {
993                 ret = send_election_request(rec, mem_ctx, ctdb_get_vnn(ctdb));
994                 if (ret!=0) {
995                         DEBUG(0, (__location__ " failed to initiate recmaster election"));
996                 }
997                 talloc_free(mem_ctx);
998                 /*unban_all_nodes(ctdb);*/
999                 return;
1000         }
1001
1002         /* release the recmaster lock */
1003         if (em->vnn != ctdb->vnn &&
1004             ctdb->recovery_lock_fd != -1) {
1005                 close(ctdb->recovery_lock_fd);
1006                 ctdb->recovery_lock_fd = -1;
1007                 unban_all_nodes(ctdb);
1008         }
1009
1010         /* ok, let that guy become recmaster then */
1011         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_vnn(ctdb), em->vnn);
1012         if (ret != 0) {
1013                 DEBUG(0, (__location__ " failed to send recmaster election request"));
1014                 talloc_free(mem_ctx);
1015                 return;
1016         }
1017
1018         /* release any bans */
1019         rec->last_culprit = (uint32_t)-1;
1020         talloc_free(rec->banned_nodes);
1021         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1022         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1023
1024         talloc_free(mem_ctx);
1025         return;
1026 }
1027
1028
1029 /*
1030   force the start of the election process
1031  */
1032 static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t vnn, 
1033                            struct ctdb_node_map *nodemap)
1034 {
1035         int ret;
1036         struct ctdb_context *ctdb = rec->ctdb;
1037
1038         /* set all nodes to recovery mode to stop all internode traffic */
1039         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1040         if (ret!=0) {
1041                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
1042                 return;
1043         }
1044         
1045         ret = send_election_request(rec, mem_ctx, vnn);
1046         if (ret!=0) {
1047                 DEBUG(0, (__location__ " failed to initiate recmaster election"));
1048                 return;
1049         }
1050
1051         /* wait for a few seconds to collect all responses */
1052         ctdb_wait_timeout(ctdb, ctdb->tunable.election_timeout);
1053 }
1054
1055
1056
1057 /*
1058   handler for when a node changes its flags
1059 */
1060 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1061                             TDB_DATA data, void *private_data)
1062 {
1063         int ret;
1064         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1065         struct ctdb_node_map *nodemap=NULL;
1066         TALLOC_CTX *tmp_ctx;
1067         uint32_t changed_flags;
1068         int i;
1069
1070         if (data.dsize != sizeof(*c)) {
1071                 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1072                 return;
1073         }
1074
1075         tmp_ctx = talloc_new(ctdb);
1076         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1077
1078         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1079
1080         for (i=0;i<nodemap->num;i++) {
1081                 if (nodemap->nodes[i].vnn == c->vnn) break;
1082         }
1083
1084         if (i == nodemap->num) {
1085                 DEBUG(0,(__location__ "Flag change for non-existant node %u\n", c->vnn));
1086                 talloc_free(tmp_ctx);
1087                 return;
1088         }
1089
1090         changed_flags = c->old_flags ^ c->new_flags;
1091
1092         /* Dont let messages from remote nodes change the DISCONNECTED flag. 
1093            This flag is handled locally based on whether the local node
1094            can communicate with the node or not.
1095         */
1096         c->new_flags &= ~NODE_FLAGS_DISCONNECTED;
1097         if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) {
1098                 c->new_flags |= NODE_FLAGS_DISCONNECTED;
1099         }
1100
1101         if (nodemap->nodes[i].flags != c->new_flags) {
1102                 DEBUG(0,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->vnn, c->new_flags, c->old_flags));
1103         }
1104
1105         nodemap->nodes[i].flags = c->new_flags;
1106
1107         ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), 
1108                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1109
1110         if (ret == 0) {
1111                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1112                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1113         }
1114         
1115         if (ret == 0 &&
1116             ctdb->recovery_master == ctdb->vnn &&
1117             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
1118             ctdb->takeover.enabled) {
1119                 /* Only do the takeover run if the perm disabled or unhealthy
1120                    flags changed since these will cause an ip failover but not
1121                    a recovery.
1122                    If the node became disconnected or banned this will also
1123                    lead to an ip address failover but that is handled 
1124                    during recovery
1125                 */
1126                 if (changed_flags & NODE_FLAGS_DISABLED) {
1127                         ret = ctdb_takeover_run(ctdb, nodemap);
1128                         if (ret != 0) {
1129                                 DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
1130                         }
1131                         /* send a message to all clients telling them that the 
1132                            cluster has been reconfigured */
1133                         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1134                 }
1135         }
1136
1137         talloc_free(tmp_ctx);
1138 }
1139
1140
1141 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_FAILED};
1142
1143
1144 /* verify that all nodes are in recovery mode normal */
1145 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, TALLOC_CTX *mem_ctx)
1146 {
1147         struct ctdb_client_control_state **ctrl_states;
1148         uint32_t recmode;
1149         int j, ret;
1150         
1151         ctrl_states = talloc_array(mem_ctx, struct ctdb_client_control_state *,
1152                                  nodemap->num);
1153         if (!ctrl_states) {
1154                 DEBUG(0,(__location__ " Failed to allocate temporary ctrl state array\n"));
1155                 exit(-1);
1156         }
1157
1158
1159         /* loop over all active nodes and send an async getrecmode call to 
1160            them*/
1161         for (j=0; j<nodemap->num; j++) {
1162                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1163                         ctrl_states[j] = NULL;
1164                         continue;
1165                 }
1166                 ctrl_states[j] = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
1167                                         CONTROL_TIMEOUT(), 
1168                                         nodemap->nodes[j].vnn);
1169         }
1170
1171         /* wait for the responses to come back and check that all is ok */
1172         for (j=0; j<nodemap->num; j++) {
1173                 if (ctrl_states[j] == NULL) {
1174                         continue;
1175                 }
1176                 ret = ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, ctrl_states[j], &recmode);
1177                 if (ret != 0) {
1178                         DEBUG(0, ("Unable to get recmode from node %u\n", nodemap->nodes[j].vnn));
1179                         talloc_free(ctrl_states);
1180                         return MONITOR_FAILED;
1181                 }
1182                 if (recmode != CTDB_RECOVERY_NORMAL) {
1183                         DEBUG(0, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", nodemap->nodes[j].vnn));
1184                         talloc_free(ctrl_states);
1185                         return MONITOR_RECOVERY_NEEDED;
1186                 }
1187         }
1188
1189         talloc_free(ctrl_states);
1190         return MONITOR_OK;
1191 }
1192
1193
1194 /*
1195   the main monitoring loop
1196  */
1197 static void monitor_cluster(struct ctdb_context *ctdb)
1198 {
1199         uint32_t vnn, num_active, recmaster;
1200         TALLOC_CTX *mem_ctx=NULL;
1201         struct ctdb_node_map *nodemap=NULL;
1202         struct ctdb_node_map *remote_nodemap=NULL;
1203         struct ctdb_vnn_map *vnnmap=NULL;
1204         struct ctdb_vnn_map *remote_vnnmap=NULL;
1205         int i, j, ret;
1206         bool need_takeover_run;
1207         struct ctdb_recoverd *rec;
1208
1209         rec = talloc_zero(ctdb, struct ctdb_recoverd);
1210         CTDB_NO_MEMORY_FATAL(ctdb, rec);
1211
1212         rec->ctdb = ctdb;
1213         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1214         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1215
1216         rec->priority_time = timeval_current();
1217
1218         /* register a message port for recovery elections */
1219         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
1220
1221         /* and one for when nodes are disabled/enabled */
1222         ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
1223
1224         /* and one for when nodes are banned */
1225         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
1226
1227         /* and one for when nodes are unbanned */
1228         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
1229         
1230 again:
1231         need_takeover_run = false;
1232
1233         if (mem_ctx) {
1234                 talloc_free(mem_ctx);
1235                 mem_ctx = NULL;
1236         }
1237         mem_ctx = talloc_new(ctdb);
1238         if (!mem_ctx) {
1239                 DEBUG(0,("Failed to create temporary context\n"));
1240                 exit(-1);
1241         }
1242
1243         /* we only check for recovery once every second */
1244         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
1245
1246         /* get relevant tunables */
1247         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
1248         if (ret != 0) {
1249                 DEBUG(0,("Failed to get tunables - retrying\n"));
1250                 goto again;
1251         }
1252
1253         vnn = ctdb_ctrl_getvnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1254         if (vnn == (uint32_t)-1) {
1255                 DEBUG(0,("Failed to get local vnn - retrying\n"));
1256                 goto again;
1257         }
1258
1259         /* get the vnnmap */
1260         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, &vnnmap);
1261         if (ret != 0) {
1262                 DEBUG(0, (__location__ " Unable to get vnnmap from node %u\n", vnn));
1263                 goto again;
1264         }
1265
1266
1267         /* get number of nodes */
1268         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, &nodemap);
1269         if (ret != 0) {
1270                 DEBUG(0, (__location__ " Unable to get nodemap from node %u\n", vnn));
1271                 goto again;
1272         }
1273
1274
1275         /* count how many active nodes there are */
1276         num_active = 0;
1277         for (i=0; i<nodemap->num; i++) {
1278                 if (rec->banned_nodes[nodemap->nodes[i].vnn] != NULL) {
1279                         nodemap->nodes[i].flags |= NODE_FLAGS_BANNED;
1280                 } else {
1281                         nodemap->nodes[i].flags &= ~NODE_FLAGS_BANNED;
1282                 }
1283                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1284                         num_active++;
1285                 }
1286         }
1287
1288
1289         /* check which node is the recovery master */
1290         ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), vnn, &recmaster);
1291         if (ret != 0) {
1292                 DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", vnn));
1293                 goto again;
1294         }
1295
1296         if (recmaster == (uint32_t)-1) {
1297                 DEBUG(0,(__location__ " Initial recovery master set - forcing election\n"));
1298                 force_election(rec, mem_ctx, vnn, nodemap);
1299                 goto again;
1300         }
1301         
1302         /* verify that the recmaster node is still active */
1303         for (j=0; j<nodemap->num; j++) {
1304                 if (nodemap->nodes[j].vnn==recmaster) {
1305                         break;
1306                 }
1307         }
1308
1309         if (j == nodemap->num) {
1310                 DEBUG(0, ("Recmaster node %u not in list. Force reelection\n", recmaster));
1311                 force_election(rec, mem_ctx, vnn, nodemap);
1312                 goto again;
1313         }
1314
1315         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1316                 DEBUG(0, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].vnn));
1317                 force_election(rec, mem_ctx, vnn, nodemap);
1318                 goto again;
1319         }
1320         
1321
1322         /* if we are not the recmaster then we do not need to check
1323            if recovery is needed
1324          */
1325         if (vnn!=recmaster) {
1326                 goto again;
1327         }
1328
1329
1330         /* verify that all active nodes agree that we are the recmaster */
1331         for (j=0; j<nodemap->num; j++) {
1332                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1333                         continue;
1334                 }
1335                 if (nodemap->nodes[j].vnn == vnn) {
1336                         continue;
1337                 }
1338
1339                 ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, &recmaster);
1340                 if (ret != 0) {
1341                         DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", vnn));
1342                         goto again;
1343                 }
1344
1345                 if (recmaster!=vnn) {
1346                         DEBUG(0, ("Node %u does not agree we are the recmaster. Force reelection\n", 
1347                                   nodemap->nodes[j].vnn));
1348                         force_election(rec, mem_ctx, vnn, nodemap);
1349                         goto again;
1350                 }
1351         }
1352
1353
1354         /* verify that all active nodes are in normal mode 
1355            and not in recovery mode 
1356          */
1357         /* send a getrecmode call out to every node */
1358         switch (verify_recmode(ctdb, nodemap, mem_ctx)) {
1359         case MONITOR_RECOVERY_NEEDED:
1360                 do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
1361                 goto again;
1362         case MONITOR_FAILED:
1363                 goto again;
1364         case MONITOR_OK:
1365                 break;
1366         }
1367
1368
1369
1370         /* get the nodemap for all active remote nodes and verify
1371            they are the same as for this node
1372          */
1373         for (j=0; j<nodemap->num; j++) {
1374                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1375                         continue;
1376                 }
1377                 if (nodemap->nodes[j].vnn == vnn) {
1378                         continue;
1379                 }
1380
1381                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, 
1382                                            mem_ctx, &remote_nodemap);
1383                 if (ret != 0) {
1384                         DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
1385                                   nodemap->nodes[j].vnn));
1386                         goto again;
1387                 }
1388
1389                 /* if the nodes disagree on how many nodes there are
1390                    then this is a good reason to try recovery
1391                  */
1392                 if (remote_nodemap->num != nodemap->num) {
1393                         DEBUG(0, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
1394                                   nodemap->nodes[j].vnn, remote_nodemap->num, nodemap->num));
1395                         do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
1396                         goto again;
1397                 }
1398
1399                 /* if the nodes disagree on which nodes exist and are
1400                    active, then that is also a good reason to do recovery
1401                  */
1402                 for (i=0;i<nodemap->num;i++) {
1403                         if (remote_nodemap->nodes[i].vnn != nodemap->nodes[i].vnn) {
1404                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap vnn for %d (%u vs %u).\n", 
1405                                           nodemap->nodes[j].vnn, i, 
1406                                           remote_nodemap->nodes[i].vnn, nodemap->nodes[i].vnn));
1407                                 do_recovery(rec, mem_ctx, vnn, num_active, nodemap, 
1408                                             vnnmap, nodemap->nodes[j].vnn);
1409                                 goto again;
1410                         }
1411                         if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
1412                             (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1413                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
1414                                           nodemap->nodes[j].vnn, i,
1415                                           remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
1416                                 do_recovery(rec, mem_ctx, vnn, num_active, nodemap, 
1417                                             vnnmap, nodemap->nodes[j].vnn);
1418                                 goto again;
1419                         }
1420                 }
1421
1422                 /* update our nodemap flags according to the other
1423                    server - this gets the NODE_FLAGS_DISABLED
1424                    flag. Note that the remote node is authoritative
1425                    for its flags (except CONNECTED, which we know
1426                    matches in this code) */
1427                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1428                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1429                         need_takeover_run = true;
1430                 }
1431         }
1432
1433
1434         /* there better be the same number of lmasters in the vnn map
1435            as there are active nodes or we will have to do a recovery
1436          */
1437         if (vnnmap->size != num_active) {
1438                 DEBUG(0, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
1439                           vnnmap->size, num_active));
1440                 do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, ctdb->vnn);
1441                 goto again;
1442         }
1443
1444         /* verify that all active nodes in the nodemap also exist in 
1445            the vnnmap.
1446          */
1447         for (j=0; j<nodemap->num; j++) {
1448                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1449                         continue;
1450                 }
1451                 if (nodemap->nodes[j].vnn == vnn) {
1452                         continue;
1453                 }
1454
1455                 for (i=0; i<vnnmap->size; i++) {
1456                         if (vnnmap->map[i] == nodemap->nodes[j].vnn) {
1457                                 break;
1458                         }
1459                 }
1460                 if (i == vnnmap->size) {
1461                         DEBUG(0, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
1462                                   nodemap->nodes[j].vnn));
1463                         do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
1464                         goto again;
1465                 }
1466         }
1467
1468         
1469         /* verify that all other nodes have the same vnnmap
1470            and are from the same generation
1471          */
1472         for (j=0; j<nodemap->num; j++) {
1473                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1474                         continue;
1475                 }
1476                 if (nodemap->nodes[j].vnn == vnn) {
1477                         continue;
1478                 }
1479
1480                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, 
1481                                           mem_ctx, &remote_vnnmap);
1482                 if (ret != 0) {
1483                         DEBUG(0, (__location__ " Unable to get vnnmap from remote node %u\n", 
1484                                   nodemap->nodes[j].vnn));
1485                         goto again;
1486                 }
1487
1488                 /* verify the vnnmap generation is the same */
1489                 if (vnnmap->generation != remote_vnnmap->generation) {
1490                         DEBUG(0, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
1491                                   nodemap->nodes[j].vnn, remote_vnnmap->generation, vnnmap->generation));
1492                         do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
1493                         goto again;
1494                 }
1495
1496                 /* verify the vnnmap size is the same */
1497                 if (vnnmap->size != remote_vnnmap->size) {
1498                         DEBUG(0, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
1499                                   nodemap->nodes[j].vnn, remote_vnnmap->size, vnnmap->size));
1500                         do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
1501                         goto again;
1502                 }
1503
1504                 /* verify the vnnmap is the same */
1505                 for (i=0;i<vnnmap->size;i++) {
1506                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
1507                                 DEBUG(0, (__location__ " Remote node %u has different vnnmap.\n", 
1508                                           nodemap->nodes[j].vnn));
1509                                 do_recovery(rec, mem_ctx, vnn, num_active, nodemap, 
1510                                             vnnmap, nodemap->nodes[j].vnn);
1511                                 goto again;
1512                         }
1513                 }
1514         }
1515
1516         /* we might need to change who has what IP assigned */
1517         if (need_takeover_run && ctdb->takeover.enabled) {
1518                 ret = ctdb_takeover_run(ctdb, nodemap);
1519                 if (ret != 0) {
1520                         DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
1521                 }
1522         }
1523
1524         goto again;
1525
1526 }
1527
1528 /*
1529   event handler for when the main ctdbd dies
1530  */
1531 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
1532                                  uint16_t flags, void *private_data)
1533 {
1534         DEBUG(0,("recovery daemon parent died - exiting\n"));
1535         _exit(1);
1536 }
1537
1538
1539
1540 /*
1541   startup the recovery daemon as a child of the main ctdb daemon
1542  */
1543 int ctdb_start_recoverd(struct ctdb_context *ctdb)
1544 {
1545         int ret;
1546         int fd[2];
1547         pid_t child;
1548
1549         if (pipe(fd) != 0) {
1550                 return -1;
1551         }
1552
1553         child = fork();
1554         if (child == -1) {
1555                 return -1;
1556         }
1557         
1558         if (child != 0) {
1559                 close(fd[0]);
1560                 return 0;
1561         }
1562
1563         close(fd[1]);
1564
1565         /* shutdown the transport */
1566         ctdb->methods->shutdown(ctdb);
1567
1568         /* get a new event context */
1569         talloc_free(ctdb->ev);
1570         ctdb->ev = event_context_init(ctdb);
1571
1572         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
1573                      ctdb_recoverd_parent, &fd[0]);     
1574
1575         close(ctdb->daemon.sd);
1576         ctdb->daemon.sd = -1;
1577
1578         srandom(getpid() ^ time(NULL));
1579
1580         /* initialise ctdb */
1581         ret = ctdb_socket_connect(ctdb);
1582         if (ret != 0) {
1583                 DEBUG(0, (__location__ " Failed to init ctdb\n"));
1584                 exit(1);
1585         }
1586
1587         monitor_cluster(ctdb);
1588
1589         DEBUG(0,("ERROR: ctdb_recoverd finished!?\n"));
1590         return -1;
1591 }