- merge from ronnie
[ambi/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "popt.h"
25 #include "cmdline.h"
26 #include "../include/ctdb.h"
27 #include "../include/ctdb_private.h"
28
29
30 struct ban_state {
31         struct ctdb_recoverd *rec;
32         uint32_t banned_node;
33 };
34
35 /*
36   private state of recovery daemon
37  */
38 struct ctdb_recoverd {
39         struct ctdb_context *ctdb;
40         uint32_t last_culprit;
41         uint32_t culprit_counter;
42         struct timeval first_recover_time;
43         struct ban_state **banned_nodes;
44         struct timeval priority_time;
45         bool need_takeover_run;
46         bool need_recovery;
47 };
48
49 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
50 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
51
52 /*
53   unban a node
54  */
55 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
56 {
57         struct ctdb_context *ctdb = rec->ctdb;
58
59         if (!ctdb_validate_pnn(ctdb, pnn)) {
60                 DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
61                 return;
62         }
63
64         if (rec->banned_nodes[pnn] == NULL) {
65                 return;
66         }
67
68         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
69
70         talloc_free(rec->banned_nodes[pnn]);
71         rec->banned_nodes[pnn] = NULL;
72 }
73
74
75 /*
76   called when a ban has timed out
77  */
78 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
79 {
80         struct ban_state *state = talloc_get_type(p, struct ban_state);
81         struct ctdb_recoverd *rec = state->rec;
82         uint32_t pnn = state->banned_node;
83
84         DEBUG(0,("Node %u is now unbanned\n", pnn));
85         ctdb_unban_node(rec, pnn);
86 }
87
88 /*
89   ban a node for a period of time
90  */
91 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
92 {
93         struct ctdb_context *ctdb = rec->ctdb;
94
95         if (!ctdb_validate_pnn(ctdb, pnn)) {
96                 DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
97                 return;
98         }
99
100         if (pnn == ctdb->pnn) {
101                 DEBUG(0,("self ban - lowering our election priority\n"));
102                 /* banning ourselves - lower our election priority */
103                 rec->priority_time = timeval_current();
104         }
105
106         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
107
108         rec->banned_nodes[pnn] = talloc(rec, struct ban_state);
109         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
110
111         rec->banned_nodes[pnn]->rec = rec;
112         rec->banned_nodes[pnn]->banned_node = pnn;
113
114         if (ban_time != 0) {
115                 event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
116                                 timeval_current_ofs(ban_time, 0),
117                                 ctdb_ban_timeout, rec->banned_nodes[pnn]);
118         }
119 }
120
121 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
122
123
124 struct freeze_node_data {
125         uint32_t count;
126         enum monitor_result status;
127 };
128
129
130 static void freeze_node_callback(struct ctdb_client_control_state *state)
131 {
132         struct freeze_node_data *fndata = talloc_get_type(state->async.private, struct freeze_node_data);
133
134
135         /* one more node has responded to our freeze node*/
136         fndata->count--;
137
138         /* if we failed to freeze the node, we must trigger another recovery */
139         if ( (state->state != CTDB_CONTROL_DONE) || (state->status != 0) ) {
140                 DEBUG(0, (__location__ " Failed to freeze node:%u. recovery failed\n", state->c->hdr.destnode));
141                 fndata->status = MONITOR_RECOVERY_NEEDED;
142         }
143
144         return;
145 }
146
147
148
149 /* freeze all nodes */
150 static enum monitor_result freeze_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
151 {
152         struct freeze_node_data *fndata;
153         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
154         struct ctdb_client_control_state *state;
155         enum monitor_result status;
156         int j;
157         
158         fndata = talloc(mem_ctx, struct freeze_node_data);
159         CTDB_NO_MEMORY_FATAL(ctdb, fndata);
160         fndata->count  = 0;
161         fndata->status = MONITOR_OK;
162
163         /* loop over all active nodes and send an async freeze call to 
164            them*/
165         for (j=0; j<nodemap->num; j++) {
166                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
167                         continue;
168                 }
169                 state = ctdb_ctrl_freeze_send(ctdb, mem_ctx, 
170                                         CONTROL_TIMEOUT(), 
171                                         nodemap->nodes[j].pnn);
172                 if (state == NULL) {
173                         /* we failed to send the control, treat this as 
174                            an error and try again next iteration
175                         */                      
176                         DEBUG(0,("Failed to call ctdb_ctrl_freeze_send during recovery\n"));
177                         talloc_free(mem_ctx);
178                         return MONITOR_RECOVERY_NEEDED;
179                 }
180
181                 /* set up the callback functions */
182                 state->async.fn = freeze_node_callback;
183                 state->async.private = fndata;
184
185                 /* one more control to wait for to complete */
186                 fndata->count++;
187         }
188
189
190         /* now wait for up to the maximum number of seconds allowed
191            or until all nodes we expect a response from has replied
192         */
193         while (fndata->count > 0) {
194                 event_loop_once(ctdb->ev);
195         }
196
197         status = fndata->status;
198         talloc_free(mem_ctx);
199         return status;
200 }
201
202
203 /*
204   change recovery mode on all nodes
205  */
206 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
207 {
208         int j, ret;
209
210         /* freeze all nodes */
211         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
212                 ret = freeze_all_nodes(ctdb, nodemap);
213                 if (ret != MONITOR_OK) {
214                         DEBUG(0, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
215                         return -1;
216                 }
217         }
218
219
220         /* set recovery mode to active on all nodes */
221         for (j=0; j<nodemap->num; j++) {
222                 /* dont change it for nodes that are unavailable */
223                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
224                         continue;
225                 }
226
227                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, rec_mode);
228                 if (ret != 0) {
229                         DEBUG(0, (__location__ " Unable to set recmode on node %u\n", nodemap->nodes[j].pnn));
230                         return -1;
231                 }
232
233                 if (rec_mode == CTDB_RECOVERY_NORMAL) {
234                         ret = ctdb_ctrl_thaw(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn);
235                         if (ret != 0) {
236                                 DEBUG(0, (__location__ " Unable to thaw node %u\n", nodemap->nodes[j].pnn));
237                                 return -1;
238                         }
239                 }
240         }
241
242         return 0;
243 }
244
245 /*
246   change recovery master on all node
247  */
248 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
249 {
250         int j, ret;
251
252         /* set recovery master to pnn on all nodes */
253         for (j=0; j<nodemap->num; j++) {
254                 /* dont change it for nodes that are unavailable */
255                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
256                         continue;
257                 }
258
259                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, pnn);
260                 if (ret != 0) {
261                         DEBUG(0, (__location__ " Unable to set recmaster on node %u\n", nodemap->nodes[j].pnn));
262                         return -1;
263                 }
264         }
265
266         return 0;
267 }
268
269
270 /*
271   ensure all other nodes have attached to any databases that we have
272  */
273 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
274                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
275 {
276         int i, j, db, ret;
277         struct ctdb_dbid_map *remote_dbmap;
278
279         /* verify that all other nodes have all our databases */
280         for (j=0; j<nodemap->num; j++) {
281                 /* we dont need to ourself ourselves */
282                 if (nodemap->nodes[j].pnn == pnn) {
283                         continue;
284                 }
285                 /* dont check nodes that are unavailable */
286                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
287                         continue;
288                 }
289
290                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
291                                          mem_ctx, &remote_dbmap);
292                 if (ret != 0) {
293                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
294                         return -1;
295                 }
296
297                 /* step through all local databases */
298                 for (db=0; db<dbmap->num;db++) {
299                         const char *name;
300
301
302                         for (i=0;i<remote_dbmap->num;i++) {
303                                 if (dbmap->dbids[db] == remote_dbmap->dbids[i]) {
304                                         break;
305                                 }
306                         }
307                         /* the remote node already have this database */
308                         if (i!=remote_dbmap->num) {
309                                 continue;
310                         }
311                         /* ok so we need to create this database */
312                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbids[db], mem_ctx, &name);
313                         if (ret != 0) {
314                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", pnn));
315                                 return -1;
316                         }
317                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, name);
318                         if (ret != 0) {
319                                 DEBUG(0, (__location__ " Unable to create remote db:%s\n", name));
320                                 return -1;
321                         }
322                 }
323         }
324
325         return 0;
326 }
327
328
329 /*
330   ensure we are attached to any databases that anyone else is attached to
331  */
332 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
333                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
334 {
335         int i, j, db, ret;
336         struct ctdb_dbid_map *remote_dbmap;
337
338         /* verify that we have all database any other node has */
339         for (j=0; j<nodemap->num; j++) {
340                 /* we dont need to ourself ourselves */
341                 if (nodemap->nodes[j].pnn == pnn) {
342                         continue;
343                 }
344                 /* dont check nodes that are unavailable */
345                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
346                         continue;
347                 }
348
349                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
350                                          mem_ctx, &remote_dbmap);
351                 if (ret != 0) {
352                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
353                         return -1;
354                 }
355
356                 /* step through all databases on the remote node */
357                 for (db=0; db<remote_dbmap->num;db++) {
358                         const char *name;
359
360                         for (i=0;i<(*dbmap)->num;i++) {
361                                 if (remote_dbmap->dbids[db] == (*dbmap)->dbids[i]) {
362                                         break;
363                                 }
364                         }
365                         /* we already have this db locally */
366                         if (i!=(*dbmap)->num) {
367                                 continue;
368                         }
369                         /* ok so we need to create this database and
370                            rebuild dbmap
371                          */
372                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
373                                             remote_dbmap->dbids[db], mem_ctx, &name);
374                         if (ret != 0) {
375                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", 
376                                           nodemap->nodes[j].pnn));
377                                 return -1;
378                         }
379                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name);
380                         if (ret != 0) {
381                                 DEBUG(0, (__location__ " Unable to create local db:%s\n", name));
382                                 return -1;
383                         }
384                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
385                         if (ret != 0) {
386                                 DEBUG(0, (__location__ " Unable to reread dbmap on node %u\n", pnn));
387                                 return -1;
388                         }
389                 }
390         }
391
392         return 0;
393 }
394
395
396 /*
397   pull all the remote database contents into ours
398  */
399 static int pull_all_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
400                                      uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
401 {
402         int i, j, ret;
403
404         /* pull all records from all other nodes across onto this node
405            (this merges based on rsn)
406         */
407         for (i=0;i<dbmap->num;i++) {
408                 for (j=0; j<nodemap->num; j++) {
409                         /* we dont need to merge with ourselves */
410                         if (nodemap->nodes[j].pnn == pnn) {
411                                 continue;
412                         }
413                         /* dont merge from nodes that are unavailable */
414                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
415                                 continue;
416                         }
417                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
418                                                pnn, dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
419                         if (ret != 0) {
420                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
421                                           nodemap->nodes[j].pnn, pnn));
422                                 return -1;
423                         }
424                 }
425         }
426
427         return 0;
428 }
429
430
431 /*
432   change the dmaster on all databases to point to us
433  */
434 static int update_dmaster_on_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
435                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
436 {
437         int i, j, ret;
438
439         /* update dmaster to point to this node for all databases/nodes */
440         for (i=0;i<dbmap->num;i++) {
441                 for (j=0; j<nodemap->num; j++) {
442                         /* dont repoint nodes that are unavailable */
443                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
444                                 continue;
445                         }
446                         ret = ctdb_ctrl_setdmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, ctdb, dbmap->dbids[i], pnn);
447                         if (ret != 0) {
448                                 DEBUG(0, (__location__ " Unable to set dmaster for node %u db:0x%08x\n", nodemap->nodes[j].pnn, dbmap->dbids[i]));
449                                 return -1;
450                         }
451                 }
452         }
453
454         return 0;
455 }
456
457
458 /*
459   update flags on all active nodes
460  */
461 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
462 {
463         int i;
464         for (i=0;i<nodemap->num;i++) {
465                 struct ctdb_node_flag_change c;
466                 TDB_DATA data;
467
468                 c.pnn = nodemap->nodes[i].pnn;
469                 c.old_flags = nodemap->nodes[i].flags;
470                 c.new_flags = nodemap->nodes[i].flags;
471
472                 data.dptr = (uint8_t *)&c;
473                 data.dsize = sizeof(c);
474
475                 ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
476                                   CTDB_SRVID_NODE_FLAGS_CHANGED, data);
477
478         }
479         return 0;
480 }
481
482 /*
483   vacuum one database
484  */
485 static int vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *nodemap)
486 {
487         uint64_t max_rsn;
488         int ret, i;
489
490         /* find max rsn on our local node for this db */
491         ret = ctdb_ctrl_get_max_rsn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, db_id, &max_rsn);
492         if (ret != 0) {
493                 return -1;
494         }
495
496         /* set rsn on non-empty records to max_rsn+1 */
497         for (i=0;i<nodemap->num;i++) {
498                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
499                         continue;
500                 }
501                 ret = ctdb_ctrl_set_rsn_nonempty(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
502                                                  db_id, max_rsn+1);
503                 if (ret != 0) {
504                         DEBUG(0,(__location__ " Failed to set rsn on node %u to %llu\n",
505                                  nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
506                         return -1;
507                 }
508         }
509
510         /* delete records with rsn < max_rsn+1 on all nodes */
511         for (i=0;i<nodemap->num;i++) {
512                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
513                         continue;
514                 }
515                 ret = ctdb_ctrl_delete_low_rsn(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
516                                                  db_id, max_rsn+1);
517                 if (ret != 0) {
518                         DEBUG(0,(__location__ " Failed to delete records on node %u with rsn below %llu\n",
519                                  nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
520                         return -1;
521                 }
522         }
523
524
525         return 0;
526 }
527
528
529 /*
530   vacuum all attached databases
531  */
532 static int vacuum_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
533                                 struct ctdb_dbid_map *dbmap)
534 {
535         int i;
536
537         /* update dmaster to point to this node for all databases/nodes */
538         for (i=0;i<dbmap->num;i++) {
539                 if (vacuum_db(ctdb, dbmap->dbids[i], nodemap) != 0) {
540                         return -1;
541                 }
542         }
543         return 0;
544 }
545
546
547 /*
548   push out all our database contents to all other nodes
549  */
550 static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
551                                     uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
552 {
553         int i, j, ret;
554
555         /* push all records out to the nodes again */
556         for (i=0;i<dbmap->num;i++) {
557                 for (j=0; j<nodemap->num; j++) {
558                         /* we dont need to push to ourselves */
559                         if (nodemap->nodes[j].pnn == pnn) {
560                                 continue;
561                         }
562                         /* dont push to nodes that are unavailable */
563                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
564                                 continue;
565                         }
566                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), pnn, nodemap->nodes[j].pnn, 
567                                                dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
568                         if (ret != 0) {
569                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
570                                           pnn, nodemap->nodes[j].pnn));
571                                 return -1;
572                         }
573                 }
574         }
575
576         return 0;
577 }
578
579
580 /*
581   ensure all nodes have the same vnnmap we do
582  */
583 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
584                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
585 {
586         int j, ret;
587
588         /* push the new vnn map out to all the nodes */
589         for (j=0; j<nodemap->num; j++) {
590                 /* dont push to nodes that are unavailable */
591                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
592                         continue;
593                 }
594
595                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
596                 if (ret != 0) {
597                         DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
598                         return -1;
599                 }
600         }
601
602         return 0;
603 }
604
605
606 /*
607   handler for when the admin bans a node
608 */
609 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
610                         TDB_DATA data, void *private_data)
611 {
612         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
613         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
614         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
615         uint32_t recmaster;
616         int ret;
617
618         if (data.dsize != sizeof(*b)) {
619                 DEBUG(0,("Bad data in ban_handler\n"));
620                 talloc_free(mem_ctx);
621                 return;
622         }
623
624         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
625         if (ret != 0) {
626                 DEBUG(0,(__location__ " Failed to find the recmaster\n"));
627                 talloc_free(mem_ctx);
628                 return;
629         }
630
631         if (recmaster != ctdb->pnn) {
632                 DEBUG(0,("We are not the recmaster - ignoring ban request\n"));
633                 talloc_free(mem_ctx);
634                 return;
635         }
636
637         DEBUG(0,("Node %u has been banned for %u seconds by the administrator\n", 
638                  b->pnn, b->ban_time));
639         ctdb_ban_node(rec, b->pnn, b->ban_time);
640         talloc_free(mem_ctx);
641 }
642
643 /*
644   handler for when the admin unbans a node
645 */
646 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
647                           TDB_DATA data, void *private_data)
648 {
649         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
650         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
651         uint32_t pnn;
652         int ret;
653         uint32_t recmaster;
654
655         if (data.dsize != sizeof(uint32_t)) {
656                 DEBUG(0,("Bad data in unban_handler\n"));
657                 talloc_free(mem_ctx);
658                 return;
659         }
660         pnn = *(uint32_t *)data.dptr;
661
662         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
663         if (ret != 0) {
664                 DEBUG(0,(__location__ " Failed to find the recmaster\n"));
665                 talloc_free(mem_ctx);
666                 return;
667         }
668
669         if (recmaster != ctdb->pnn) {
670                 DEBUG(0,("We are not the recmaster - ignoring unban request\n"));
671                 talloc_free(mem_ctx);
672                 return;
673         }
674
675         DEBUG(0,("Node %u has been unbanned by the administrator\n", pnn));
676         ctdb_unban_node(rec, pnn);
677         talloc_free(mem_ctx);
678 }
679
680
681
682 /*
683   called when ctdb_wait_timeout should finish
684  */
685 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
686                               struct timeval yt, void *p)
687 {
688         uint32_t *timed_out = (uint32_t *)p;
689         (*timed_out) = 1;
690 }
691
692 /*
693   wait for a given number of seconds
694  */
695 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
696 {
697         uint32_t timed_out = 0;
698         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
699         while (!timed_out) {
700                 event_loop_once(ctdb->ev);
701         }
702 }
703
704 /* Create a new random generation ip. 
705    The generation id can not be the INVALID_GENERATION id
706 */
707 static uint32_t new_generation(void)
708 {
709         uint32_t generation;
710
711         while (1) {
712                 generation = random();
713
714                 if (generation != INVALID_GENERATION) {
715                         break;
716                 }
717         }
718
719         return generation;
720 }
721                 
722 /*
723   we are the recmaster, and recovery is needed - start a recovery run
724  */
725 static int do_recovery(struct ctdb_recoverd *rec, 
726                        TALLOC_CTX *mem_ctx, uint32_t pnn, uint32_t num_active,
727                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
728                        uint32_t culprit)
729 {
730         struct ctdb_context *ctdb = rec->ctdb;
731         int i, j, ret;
732         uint32_t generation;
733         struct ctdb_dbid_map *dbmap;
734
735         /* if recovery fails, force it again */
736         rec->need_recovery = true;
737
738         if (rec->last_culprit != culprit ||
739             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
740                 /* either a new node is the culprit, or we've decide to forgive them */
741                 rec->last_culprit = culprit;
742                 rec->first_recover_time = timeval_current();
743                 rec->culprit_counter = 0;
744         }
745         rec->culprit_counter++;
746
747         if (rec->culprit_counter > 2*nodemap->num) {
748                 DEBUG(0,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
749                          culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
750                          ctdb->tunable.recovery_ban_period));
751                 ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
752         }
753
754         if (!ctdb_recovery_lock(ctdb, true)) {
755                 DEBUG(0,("Unable to get recovery lock - aborting recovery\n"));
756                 return -1;
757         }
758
759         /* set recovery mode to active on all nodes */
760         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
761         if (ret!=0) {
762                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
763                 return -1;
764         }
765
766         DEBUG(0, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
767
768         /* pick a new generation number */
769         generation = new_generation();
770
771         /* change the vnnmap on this node to use the new generation 
772            number but not on any other nodes.
773            this guarantees that if we abort the recovery prematurely
774            for some reason (a node stops responding?)
775            that we can just return immediately and we will reenter
776            recovery shortly again.
777            I.e. we deliberately leave the cluster with an inconsistent
778            generation id to allow us to abort recovery at any stage and
779            just restart it from scratch.
780          */
781         vnnmap->generation = generation;
782         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
783         if (ret != 0) {
784                 DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
785                 return -1;
786         }
787
788         /* get a list of all databases */
789         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
790         if (ret != 0) {
791                 DEBUG(0, (__location__ " Unable to get dbids from node :%u\n", pnn));
792                 return -1;
793         }
794
795
796
797         /* verify that all other nodes have all our databases */
798         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
799         if (ret != 0) {
800                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
801                 return -1;
802         }
803
804         /* verify that we have all the databases any other node has */
805         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
806         if (ret != 0) {
807                 DEBUG(0, (__location__ " Unable to create missing local databases\n"));
808                 return -1;
809         }
810
811
812
813         /* verify that all other nodes have all our databases */
814         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
815         if (ret != 0) {
816                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
817                 return -1;
818         }
819
820
821         DEBUG(1, (__location__ " Recovery - created remote databases\n"));
822
823         /* pull all remote databases onto the local node */
824         ret = pull_all_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
825         if (ret != 0) {
826                 DEBUG(0, (__location__ " Unable to pull remote databases\n"));
827                 return -1;
828         }
829
830         DEBUG(1, (__location__ " Recovery - pulled remote databases\n"));
831
832         /* push all local databases to the remote nodes */
833         ret = push_all_local_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
834         if (ret != 0) {
835                 DEBUG(0, (__location__ " Unable to push local databases\n"));
836                 return -1;
837         }
838
839         DEBUG(1, (__location__ " Recovery - pushed remote databases\n"));
840
841         /* build a new vnn map with all the currently active and
842            unbanned nodes */
843         generation = new_generation();
844         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
845         CTDB_NO_MEMORY(ctdb, vnnmap);
846         vnnmap->generation = generation;
847         vnnmap->size = num_active;
848         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
849         for (i=j=0;i<nodemap->num;i++) {
850                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
851                         vnnmap->map[j++] = nodemap->nodes[i].pnn;
852                 }
853         }
854
855
856
857         /* update to the new vnnmap on all nodes */
858         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
859         if (ret != 0) {
860                 DEBUG(0, (__location__ " Unable to update vnnmap on all nodes\n"));
861                 return -1;
862         }
863
864         DEBUG(1, (__location__ " Recovery - updated vnnmap\n"));
865
866         /* update recmaster to point to us for all nodes */
867         ret = set_recovery_master(ctdb, nodemap, pnn);
868         if (ret!=0) {
869                 DEBUG(0, (__location__ " Unable to set recovery master\n"));
870                 return -1;
871         }
872
873         DEBUG(1, (__location__ " Recovery - updated recmaster\n"));
874
875         /* repoint all local and remote database records to the local
876            node as being dmaster
877          */
878         ret = update_dmaster_on_all_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
879         if (ret != 0) {
880                 DEBUG(0, (__location__ " Unable to update dmaster on all databases\n"));
881                 return -1;
882         }
883
884         DEBUG(1, (__location__ " Recovery - updated dmaster on all databases\n"));
885
886         /*
887           update all nodes to have the same flags that we have
888          */
889         ret = update_flags_on_all_nodes(ctdb, nodemap);
890         if (ret != 0) {
891                 DEBUG(0, (__location__ " Unable to update flags on all nodes\n"));
892                 return -1;
893         }
894         
895         DEBUG(1, (__location__ " Recovery - updated flags\n"));
896
897         /*
898           run a vacuum operation on empty records
899          */
900         ret = vacuum_all_databases(ctdb, nodemap, dbmap);
901         if (ret != 0) {
902                 DEBUG(0, (__location__ " Unable to vacuum all databases\n"));
903                 return -1;
904         }
905
906         DEBUG(1, (__location__ " Recovery - vacuumed all databases\n"));
907
908         /*
909           if enabled, tell nodes to takeover their public IPs
910          */
911         if (ctdb->vnn) {
912                 rec->need_takeover_run = false;
913                 ret = ctdb_takeover_run(ctdb, nodemap);
914                 if (ret != 0) {
915                         DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
916                         return -1;
917                 }
918                 DEBUG(1, (__location__ " Recovery - done takeover\n"));
919         }
920
921
922         /* disable recovery mode */
923         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
924         if (ret!=0) {
925                 DEBUG(0, (__location__ " Unable to set recovery mode to normal on cluster\n"));
926                 return -1;
927         }
928
929         /* send a message to all clients telling them that the cluster 
930            has been reconfigured */
931         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
932
933         DEBUG(0, (__location__ " Recovery complete\n"));
934
935         rec->need_recovery = false;
936
937         /* We just finished a recovery successfully. 
938            We now wait for rerecovery_timeout before we allow 
939            another recovery to take place.
940         */
941         DEBUG(0, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
942         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
943         DEBUG(0, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
944
945         return 0;
946 }
947
948
949 /*
950   elections are won by first checking the number of connected nodes, then
951   the priority time, then the pnn
952  */
953 struct election_message {
954         uint32_t num_connected;
955         struct timeval priority_time;
956         uint32_t pnn;
957 };
958
959 /*
960   form this nodes election data
961  */
962 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
963 {
964         int ret, i;
965         struct ctdb_node_map *nodemap;
966         struct ctdb_context *ctdb = rec->ctdb;
967
968         ZERO_STRUCTP(em);
969
970         em->pnn = rec->ctdb->pnn;
971         em->priority_time = rec->priority_time;
972
973         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
974         if (ret != 0) {
975                 return;
976         }
977
978         for (i=0;i<nodemap->num;i++) {
979                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
980                         em->num_connected++;
981                 }
982         }
983         talloc_free(nodemap);
984 }
985
986 /*
987   see if the given election data wins
988  */
989 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
990 {
991         struct election_message myem;
992         int cmp;
993
994         ctdb_election_data(rec, &myem);
995
996         /* try to use the most connected node */
997         cmp = (int)myem.num_connected - (int)em->num_connected;
998
999         /* then the longest running node */
1000         if (cmp == 0) {
1001                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1002         }
1003
1004         if (cmp == 0) {
1005                 cmp = (int)myem.pnn - (int)em->pnn;
1006         }
1007
1008         return cmp > 0;
1009 }
1010
1011 /*
1012   send out an election request
1013  */
1014 static int send_election_request(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn)
1015 {
1016         int ret;
1017         TDB_DATA election_data;
1018         struct election_message emsg;
1019         uint64_t srvid;
1020         struct ctdb_context *ctdb = rec->ctdb;
1021         
1022         srvid = CTDB_SRVID_RECOVERY;
1023
1024         ctdb_election_data(rec, &emsg);
1025
1026         election_data.dsize = sizeof(struct election_message);
1027         election_data.dptr  = (unsigned char *)&emsg;
1028
1029
1030         /* first we assume we will win the election and set 
1031            recoverymaster to be ourself on the current node
1032          */
1033         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1034         if (ret != 0) {
1035                 DEBUG(0, (__location__ " failed to send recmaster election request\n"));
1036                 return -1;
1037         }
1038
1039
1040         /* send an election message to all active nodes */
1041         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1042
1043         return 0;
1044 }
1045
1046 /*
1047   this function will unban all nodes in the cluster
1048 */
1049 static void unban_all_nodes(struct ctdb_context *ctdb)
1050 {
1051         int ret, i;
1052         struct ctdb_node_map *nodemap;
1053         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1054         
1055         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1056         if (ret != 0) {
1057                 DEBUG(0,(__location__ " failed to get nodemap to unban all nodes\n"));
1058                 return;
1059         }
1060
1061         for (i=0;i<nodemap->num;i++) {
1062                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1063                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1064                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1065                 }
1066         }
1067
1068         talloc_free(tmp_ctx);
1069 }
1070
1071 /*
1072   handler for recovery master elections
1073 */
1074 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1075                              TDB_DATA data, void *private_data)
1076 {
1077         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1078         int ret;
1079         struct election_message *em = (struct election_message *)data.dptr;
1080         TALLOC_CTX *mem_ctx;
1081
1082         mem_ctx = talloc_new(ctdb);
1083
1084         /* someone called an election. check their election data
1085            and if we disagree and we would rather be the elected node, 
1086            send a new election message to all other nodes
1087          */
1088         if (ctdb_election_win(rec, em)) {
1089                 ret = send_election_request(rec, mem_ctx, ctdb_get_pnn(ctdb));
1090                 if (ret!=0) {
1091                         DEBUG(0, (__location__ " failed to initiate recmaster election"));
1092                 }
1093                 talloc_free(mem_ctx);
1094                 /*unban_all_nodes(ctdb);*/
1095                 return;
1096         }
1097
1098         /* release the recmaster lock */
1099         if (em->pnn != ctdb->pnn &&
1100             ctdb->recovery_lock_fd != -1) {
1101                 close(ctdb->recovery_lock_fd);
1102                 ctdb->recovery_lock_fd = -1;
1103                 unban_all_nodes(ctdb);
1104         }
1105
1106         /* ok, let that guy become recmaster then */
1107         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1108         if (ret != 0) {
1109                 DEBUG(0, (__location__ " failed to send recmaster election request"));
1110                 talloc_free(mem_ctx);
1111                 return;
1112         }
1113
1114         /* release any bans */
1115         rec->last_culprit = (uint32_t)-1;
1116         talloc_free(rec->banned_nodes);
1117         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1118         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1119
1120         talloc_free(mem_ctx);
1121         return;
1122 }
1123
1124
1125 /*
1126   force the start of the election process
1127  */
1128 static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn, 
1129                            struct ctdb_node_map *nodemap)
1130 {
1131         int ret;
1132         struct ctdb_context *ctdb = rec->ctdb;
1133
1134         /* set all nodes to recovery mode to stop all internode traffic */
1135         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1136         if (ret!=0) {
1137                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
1138                 return;
1139         }
1140         
1141         ret = send_election_request(rec, mem_ctx, pnn);
1142         if (ret!=0) {
1143                 DEBUG(0, (__location__ " failed to initiate recmaster election"));
1144                 return;
1145         }
1146
1147         /* wait for a few seconds to collect all responses */
1148         ctdb_wait_timeout(ctdb, ctdb->tunable.election_timeout);
1149 }
1150
1151
1152
1153 /*
1154   handler for when a node changes its flags
1155 */
1156 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1157                             TDB_DATA data, void *private_data)
1158 {
1159         int ret;
1160         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1161         struct ctdb_node_map *nodemap=NULL;
1162         TALLOC_CTX *tmp_ctx;
1163         uint32_t changed_flags;
1164         int i;
1165         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1166
1167         if (data.dsize != sizeof(*c)) {
1168                 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1169                 return;
1170         }
1171
1172         tmp_ctx = talloc_new(ctdb);
1173         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1174
1175         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1176
1177         for (i=0;i<nodemap->num;i++) {
1178                 if (nodemap->nodes[i].pnn == c->pnn) break;
1179         }
1180
1181         if (i == nodemap->num) {
1182                 DEBUG(0,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1183                 talloc_free(tmp_ctx);
1184                 return;
1185         }
1186
1187         changed_flags = c->old_flags ^ c->new_flags;
1188
1189         /* Dont let messages from remote nodes change the DISCONNECTED flag. 
1190            This flag is handled locally based on whether the local node
1191            can communicate with the node or not.
1192         */
1193         c->new_flags &= ~NODE_FLAGS_DISCONNECTED;
1194         if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) {
1195                 c->new_flags |= NODE_FLAGS_DISCONNECTED;
1196         }
1197
1198         if (nodemap->nodes[i].flags != c->new_flags) {
1199                 DEBUG(0,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1200         }
1201
1202         nodemap->nodes[i].flags = c->new_flags;
1203
1204         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1205                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1206
1207         if (ret == 0) {
1208                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1209                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1210         }
1211         
1212         if (ret == 0 &&
1213             ctdb->recovery_master == ctdb->pnn &&
1214             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
1215             ctdb->vnn) {
1216                 /* Only do the takeover run if the perm disabled or unhealthy
1217                    flags changed since these will cause an ip failover but not
1218                    a recovery.
1219                    If the node became disconnected or banned this will also
1220                    lead to an ip address failover but that is handled 
1221                    during recovery
1222                 */
1223                 if (changed_flags & NODE_FLAGS_DISABLED) {
1224                         rec->need_takeover_run = true;
1225                 }
1226         }
1227
1228         talloc_free(tmp_ctx);
1229 }
1230
1231
1232
1233 struct verify_recmode_normal_data {
1234         uint32_t count;
1235         enum monitor_result status;
1236 };
1237
1238 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1239 {
1240         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private, struct verify_recmode_normal_data);
1241
1242
1243         /* one more node has responded with recmode data*/
1244         rmdata->count--;
1245
1246         /* if we failed to get the recmode, then return an error and let
1247            the main loop try again.
1248         */
1249         if (state->state != CTDB_CONTROL_DONE) {
1250                 if (rmdata->status == MONITOR_OK) {
1251                         rmdata->status = MONITOR_FAILED;
1252                 }
1253                 return;
1254         }
1255
1256         /* if we got a response, then the recmode will be stored in the
1257            status field
1258         */
1259         if (state->status != CTDB_RECOVERY_NORMAL) {
1260                 DEBUG(0, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
1261                 rmdata->status = MONITOR_RECOVERY_NEEDED;
1262         }
1263
1264         return;
1265 }
1266
1267
1268 /* verify that all nodes are in normal recovery mode */
1269 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
1270 {
1271         struct verify_recmode_normal_data *rmdata;
1272         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1273         struct ctdb_client_control_state *state;
1274         enum monitor_result status;
1275         int j;
1276         
1277         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
1278         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1279         rmdata->count  = 0;
1280         rmdata->status = MONITOR_OK;
1281
1282         /* loop over all active nodes and send an async getrecmode call to 
1283            them*/
1284         for (j=0; j<nodemap->num; j++) {
1285                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1286                         continue;
1287                 }
1288                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
1289                                         CONTROL_TIMEOUT(), 
1290                                         nodemap->nodes[j].pnn);
1291                 if (state == NULL) {
1292                         /* we failed to send the control, treat this as 
1293                            an error and try again next iteration
1294                         */                      
1295                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
1296                         talloc_free(mem_ctx);
1297                         return MONITOR_FAILED;
1298                 }
1299
1300                 /* set up the callback functions */
1301                 state->async.fn = verify_recmode_normal_callback;
1302                 state->async.private = rmdata;
1303
1304                 /* one more control to wait for to complete */
1305                 rmdata->count++;
1306         }
1307
1308
1309         /* now wait for up to the maximum number of seconds allowed
1310            or until all nodes we expect a response from has replied
1311         */
1312         while (rmdata->count > 0) {
1313                 event_loop_once(ctdb->ev);
1314         }
1315
1316         status = rmdata->status;
1317         talloc_free(mem_ctx);
1318         return status;
1319 }
1320
1321
1322 struct verify_recmaster_data {
1323         uint32_t count;
1324         uint32_t pnn;
1325         enum monitor_result status;
1326 };
1327
1328 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
1329 {
1330         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private, struct verify_recmaster_data);
1331
1332
1333         /* one more node has responded with recmaster data*/
1334         rmdata->count--;
1335
1336         /* if we failed to get the recmaster, then return an error and let
1337            the main loop try again.
1338         */
1339         if (state->state != CTDB_CONTROL_DONE) {
1340                 if (rmdata->status == MONITOR_OK) {
1341                         rmdata->status = MONITOR_FAILED;
1342                 }
1343                 return;
1344         }
1345
1346         /* if we got a response, then the recmaster will be stored in the
1347            status field
1348         */
1349         if (state->status != rmdata->pnn) {
1350                 DEBUG(0,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
1351                 rmdata->status = MONITOR_ELECTION_NEEDED;
1352         }
1353
1354         return;
1355 }
1356
1357
1358 /* verify that all nodes agree that we are the recmaster */
1359 static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
1360 {
1361         struct verify_recmaster_data *rmdata;
1362         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1363         struct ctdb_client_control_state *state;
1364         enum monitor_result status;
1365         int j;
1366         
1367         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
1368         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1369         rmdata->count  = 0;
1370         rmdata->pnn    = pnn;
1371         rmdata->status = MONITOR_OK;
1372
1373         /* loop over all active nodes and send an async getrecmaster call to 
1374            them*/
1375         for (j=0; j<nodemap->num; j++) {
1376                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1377                         continue;
1378                 }
1379                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
1380                                         CONTROL_TIMEOUT(),
1381                                         nodemap->nodes[j].pnn);
1382                 if (state == NULL) {
1383                         /* we failed to send the control, treat this as 
1384                            an error and try again next iteration
1385                         */                      
1386                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
1387                         talloc_free(mem_ctx);
1388                         return MONITOR_FAILED;
1389                 }
1390
1391                 /* set up the callback functions */
1392                 state->async.fn = verify_recmaster_callback;
1393                 state->async.private = rmdata;
1394
1395                 /* one more control to wait for to complete */
1396                 rmdata->count++;
1397         }
1398
1399
1400         /* now wait for up to the maximum number of seconds allowed
1401            or until all nodes we expect a response from has replied
1402         */
1403         while (rmdata->count > 0) {
1404                 event_loop_once(ctdb->ev);
1405         }
1406
1407         status = rmdata->status;
1408         talloc_free(mem_ctx);
1409         return status;
1410 }
1411
1412
1413 /*
1414   the main monitoring loop
1415  */
1416 static void monitor_cluster(struct ctdb_context *ctdb)
1417 {
1418         uint32_t pnn, num_active, recmaster;
1419         TALLOC_CTX *mem_ctx=NULL;
1420         struct ctdb_node_map *nodemap=NULL;
1421         struct ctdb_node_map *remote_nodemap=NULL;
1422         struct ctdb_vnn_map *vnnmap=NULL;
1423         struct ctdb_vnn_map *remote_vnnmap=NULL;
1424         int i, j, ret;
1425         struct ctdb_recoverd *rec;
1426
1427         rec = talloc_zero(ctdb, struct ctdb_recoverd);
1428         CTDB_NO_MEMORY_FATAL(ctdb, rec);
1429
1430         rec->ctdb = ctdb;
1431         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1432         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1433
1434         rec->priority_time = timeval_current();
1435
1436         /* register a message port for recovery elections */
1437         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
1438
1439         /* and one for when nodes are disabled/enabled */
1440         ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
1441
1442         /* and one for when nodes are banned */
1443         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
1444
1445         /* and one for when nodes are unbanned */
1446         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
1447         
1448 again:
1449         if (mem_ctx) {
1450                 talloc_free(mem_ctx);
1451                 mem_ctx = NULL;
1452         }
1453         mem_ctx = talloc_new(ctdb);
1454         if (!mem_ctx) {
1455                 DEBUG(0,("Failed to create temporary context\n"));
1456                 exit(-1);
1457         }
1458
1459         /* we only check for recovery once every second */
1460         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
1461
1462         /* get relevant tunables */
1463         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
1464         if (ret != 0) {
1465                 DEBUG(0,("Failed to get tunables - retrying\n"));
1466                 goto again;
1467         }
1468
1469         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1470         if (pnn == (uint32_t)-1) {
1471                 DEBUG(0,("Failed to get local pnn - retrying\n"));
1472                 goto again;
1473         }
1474
1475         /* get the vnnmap */
1476         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
1477         if (ret != 0) {
1478                 DEBUG(0, (__location__ " Unable to get vnnmap from node %u\n", pnn));
1479                 goto again;
1480         }
1481
1482
1483         /* get number of nodes */
1484         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &nodemap);
1485         if (ret != 0) {
1486                 DEBUG(0, (__location__ " Unable to get nodemap from node %u\n", pnn));
1487                 goto again;
1488         }
1489
1490
1491         /* count how many active nodes there are */
1492         num_active = 0;
1493         for (i=0; i<nodemap->num; i++) {
1494                 if (rec->banned_nodes[nodemap->nodes[i].pnn] != NULL) {
1495                         nodemap->nodes[i].flags |= NODE_FLAGS_BANNED;
1496                 } else {
1497                         nodemap->nodes[i].flags &= ~NODE_FLAGS_BANNED;
1498                 }
1499                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1500                         num_active++;
1501                 }
1502         }
1503
1504
1505         /* check which node is the recovery master */
1506         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &recmaster);
1507         if (ret != 0) {
1508                 DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", pnn));
1509                 goto again;
1510         }
1511
1512         if (recmaster == (uint32_t)-1) {
1513                 DEBUG(0,(__location__ " Initial recovery master set - forcing election\n"));
1514                 force_election(rec, mem_ctx, pnn, nodemap);
1515                 goto again;
1516         }
1517         
1518         /* verify that the recmaster node is still active */
1519         for (j=0; j<nodemap->num; j++) {
1520                 if (nodemap->nodes[j].pnn==recmaster) {
1521                         break;
1522                 }
1523         }
1524
1525         if (j == nodemap->num) {
1526                 DEBUG(0, ("Recmaster node %u not in list. Force reelection\n", recmaster));
1527                 force_election(rec, mem_ctx, pnn, nodemap);
1528                 goto again;
1529         }
1530
1531         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1532                 DEBUG(0, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
1533                 force_election(rec, mem_ctx, pnn, nodemap);
1534                 goto again;
1535         }
1536         
1537
1538         /* if we are not the recmaster then we do not need to check
1539            if recovery is needed
1540          */
1541         if (pnn != recmaster) {
1542                 goto again;
1543         }
1544
1545
1546         /* update the list of public ips that a node can handle for
1547            all connected nodes
1548         */
1549         for (j=0; j<nodemap->num; j++) {
1550                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1551                         continue;
1552                 }
1553                 /* release any existing data */
1554                 if (ctdb->nodes[j]->public_ips) {
1555                         talloc_free(ctdb->nodes[j]->public_ips);
1556                         ctdb->nodes[j]->public_ips = NULL;
1557                 }
1558                 /* grab a new shiny list of public ips from the node */
1559                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
1560                         ctdb->nodes[j]->pnn, 
1561                         ctdb->nodes,
1562                         &ctdb->nodes[j]->public_ips)) {
1563                         DEBUG(0,("Failed to read public ips from node : %u\n", 
1564                                 ctdb->nodes[j]->pnn));
1565                         goto again;
1566                 }
1567         }
1568
1569
1570         /* verify that all active nodes agree that we are the recmaster */
1571         switch (verify_recmaster(ctdb, nodemap, pnn)) {
1572         case MONITOR_RECOVERY_NEEDED:
1573                 /* can not happen */
1574                 goto again;
1575         case MONITOR_ELECTION_NEEDED:
1576                 force_election(rec, mem_ctx, pnn, nodemap);
1577                 goto again;
1578         case MONITOR_OK:
1579                 break;
1580         case MONITOR_FAILED:
1581                 goto again;
1582         }
1583
1584
1585         if (rec->need_recovery) {
1586                 /* a previous recovery didn't finish */
1587                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1588                 goto again;             
1589         }
1590
1591         /* verify that all active nodes are in normal mode 
1592            and not in recovery mode 
1593          */
1594         switch (verify_recmode(ctdb, nodemap)) {
1595         case MONITOR_RECOVERY_NEEDED:
1596                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1597                 goto again;
1598         case MONITOR_FAILED:
1599                 goto again;
1600         case MONITOR_ELECTION_NEEDED:
1601                 /* can not happen */
1602         case MONITOR_OK:
1603                 break;
1604         }
1605
1606
1607
1608         /* get the nodemap for all active remote nodes and verify
1609            they are the same as for this node
1610          */
1611         for (j=0; j<nodemap->num; j++) {
1612                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1613                         continue;
1614                 }
1615                 if (nodemap->nodes[j].pnn == pnn) {
1616                         continue;
1617                 }
1618
1619                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1620                                            mem_ctx, &remote_nodemap);
1621                 if (ret != 0) {
1622                         DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
1623                                   nodemap->nodes[j].pnn));
1624                         goto again;
1625                 }
1626
1627                 /* if the nodes disagree on how many nodes there are
1628                    then this is a good reason to try recovery
1629                  */
1630                 if (remote_nodemap->num != nodemap->num) {
1631                         DEBUG(0, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
1632                                   nodemap->nodes[j].pnn, remote_nodemap->num, nodemap->num));
1633                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1634                         goto again;
1635                 }
1636
1637                 /* if the nodes disagree on which nodes exist and are
1638                    active, then that is also a good reason to do recovery
1639                  */
1640                 for (i=0;i<nodemap->num;i++) {
1641                         if (remote_nodemap->nodes[i].pnn != nodemap->nodes[i].pnn) {
1642                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
1643                                           nodemap->nodes[j].pnn, i, 
1644                                           remote_nodemap->nodes[i].pnn, nodemap->nodes[i].pnn));
1645                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1646                                             vnnmap, nodemap->nodes[j].pnn);
1647                                 goto again;
1648                         }
1649                         if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
1650                             (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1651                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
1652                                           nodemap->nodes[j].pnn, i,
1653                                           remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
1654                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1655                                             vnnmap, nodemap->nodes[j].pnn);
1656                                 goto again;
1657                         }
1658                 }
1659
1660                 /* update our nodemap flags according to the other
1661                    server - this gets the NODE_FLAGS_DISABLED
1662                    flag. Note that the remote node is authoritative
1663                    for its flags (except CONNECTED, which we know
1664                    matches in this code) */
1665                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1666                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1667                         rec->need_takeover_run = true;
1668                 }
1669         }
1670
1671
1672         /* there better be the same number of lmasters in the vnn map
1673            as there are active nodes or we will have to do a recovery
1674          */
1675         if (vnnmap->size != num_active) {
1676                 DEBUG(0, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
1677                           vnnmap->size, num_active));
1678                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
1679                 goto again;
1680         }
1681
1682         /* verify that all active nodes in the nodemap also exist in 
1683            the vnnmap.
1684          */
1685         for (j=0; j<nodemap->num; j++) {
1686                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1687                         continue;
1688                 }
1689                 if (nodemap->nodes[j].pnn == pnn) {
1690                         continue;
1691                 }
1692
1693                 for (i=0; i<vnnmap->size; i++) {
1694                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
1695                                 break;
1696                         }
1697                 }
1698                 if (i == vnnmap->size) {
1699                         DEBUG(0, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
1700                                   nodemap->nodes[j].pnn));
1701                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1702                         goto again;
1703                 }
1704         }
1705
1706         
1707         /* verify that all other nodes have the same vnnmap
1708            and are from the same generation
1709          */
1710         for (j=0; j<nodemap->num; j++) {
1711                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1712                         continue;
1713                 }
1714                 if (nodemap->nodes[j].pnn == pnn) {
1715                         continue;
1716                 }
1717
1718                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1719                                           mem_ctx, &remote_vnnmap);
1720                 if (ret != 0) {
1721                         DEBUG(0, (__location__ " Unable to get vnnmap from remote node %u\n", 
1722                                   nodemap->nodes[j].pnn));
1723                         goto again;
1724                 }
1725
1726                 /* verify the vnnmap generation is the same */
1727                 if (vnnmap->generation != remote_vnnmap->generation) {
1728                         DEBUG(0, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
1729                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
1730                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1731                         goto again;
1732                 }
1733
1734                 /* verify the vnnmap size is the same */
1735                 if (vnnmap->size != remote_vnnmap->size) {
1736                         DEBUG(0, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
1737                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
1738                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1739                         goto again;
1740                 }
1741
1742                 /* verify the vnnmap is the same */
1743                 for (i=0;i<vnnmap->size;i++) {
1744                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
1745                                 DEBUG(0, (__location__ " Remote node %u has different vnnmap.\n", 
1746                                           nodemap->nodes[j].pnn));
1747                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1748                                             vnnmap, nodemap->nodes[j].pnn);
1749                                 goto again;
1750                         }
1751                 }
1752         }
1753
1754         /* we might need to change who has what IP assigned */
1755         if (rec->need_takeover_run) {
1756                 rec->need_takeover_run = false;
1757                 ret = ctdb_takeover_run(ctdb, nodemap);
1758                 if (ret != 0) {
1759                         DEBUG(0, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
1760                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1761                                     vnnmap, nodemap->nodes[j].pnn);
1762                 }
1763         }
1764
1765         goto again;
1766
1767 }
1768
1769 /*
1770   event handler for when the main ctdbd dies
1771  */
1772 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
1773                                  uint16_t flags, void *private_data)
1774 {
1775         DEBUG(0,("recovery daemon parent died - exiting\n"));
1776         _exit(1);
1777 }
1778
1779
1780
1781 /*
1782   startup the recovery daemon as a child of the main ctdb daemon
1783  */
1784 int ctdb_start_recoverd(struct ctdb_context *ctdb)
1785 {
1786         int ret;
1787         int fd[2];
1788         pid_t child;
1789
1790         if (pipe(fd) != 0) {
1791                 return -1;
1792         }
1793
1794         child = fork();
1795         if (child == -1) {
1796                 return -1;
1797         }
1798         
1799         if (child != 0) {
1800                 close(fd[0]);
1801                 return 0;
1802         }
1803
1804         close(fd[1]);
1805
1806         /* shutdown the transport */
1807         ctdb->methods->shutdown(ctdb);
1808
1809         /* get a new event context */
1810         talloc_free(ctdb->ev);
1811         ctdb->ev = event_context_init(ctdb);
1812
1813         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
1814                      ctdb_recoverd_parent, &fd[0]);     
1815
1816         close(ctdb->daemon.sd);
1817         ctdb->daemon.sd = -1;
1818
1819         srandom(getpid() ^ time(NULL));
1820
1821         /* initialise ctdb */
1822         ret = ctdb_socket_connect(ctdb);
1823         if (ret != 0) {
1824                 DEBUG(0, (__location__ " Failed to init ctdb\n"));
1825                 exit(1);
1826         }
1827
1828         monitor_cluster(ctdb);
1829
1830         DEBUG(0,("ERROR: ctdb_recoverd finished!?\n"));
1831         return -1;
1832 }