when a ctdb_takeover_run has failed we must make sure that
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "popt.h"
25 #include "cmdline.h"
26 #include "../include/ctdb.h"
27 #include "../include/ctdb_private.h"
28
29
30 struct ban_state {
31         struct ctdb_recoverd *rec;
32         uint32_t banned_node;
33 };
34
35 /*
36   private state of recovery daemon
37  */
38 struct ctdb_recoverd {
39         struct ctdb_context *ctdb;
40         uint32_t last_culprit;
41         uint32_t culprit_counter;
42         struct timeval first_recover_time;
43         struct ban_state **banned_nodes;
44         struct timeval priority_time;
45         bool need_takeover_run;
46 };
47
48 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
49 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
50
51 /*
52   unban a node
53  */
54 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
55 {
56         struct ctdb_context *ctdb = rec->ctdb;
57
58         if (!ctdb_validate_pnn(ctdb, pnn)) {
59                 DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
60                 return;
61         }
62
63         if (rec->banned_nodes[pnn] == NULL) {
64                 return;
65         }
66
67         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
68
69         talloc_free(rec->banned_nodes[pnn]);
70         rec->banned_nodes[pnn] = NULL;
71 }
72
73
74 /*
75   called when a ban has timed out
76  */
77 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
78 {
79         struct ban_state *state = talloc_get_type(p, struct ban_state);
80         struct ctdb_recoverd *rec = state->rec;
81         uint32_t pnn = state->banned_node;
82
83         DEBUG(0,("Node %u is now unbanned\n", pnn));
84         ctdb_unban_node(rec, pnn);
85 }
86
87 /*
88   ban a node for a period of time
89  */
90 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
91 {
92         struct ctdb_context *ctdb = rec->ctdb;
93
94         if (!ctdb_validate_pnn(ctdb, pnn)) {
95                 DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
96                 return;
97         }
98
99         if (pnn == ctdb->pnn) {
100                 DEBUG(0,("self ban - lowering our election priority\n"));
101                 /* banning ourselves - lower our election priority */
102                 rec->priority_time = timeval_current();
103         }
104
105         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
106
107         rec->banned_nodes[pnn] = talloc(rec, struct ban_state);
108         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
109
110         rec->banned_nodes[pnn]->rec = rec;
111         rec->banned_nodes[pnn]->banned_node = pnn;
112
113         if (ban_time != 0) {
114                 event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
115                                 timeval_current_ofs(ban_time, 0),
116                                 ctdb_ban_timeout, rec->banned_nodes[pnn]);
117         }
118 }
119
120 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
121
122
123 struct freeze_node_data {
124         uint32_t count;
125         enum monitor_result status;
126 };
127
128
129 static void freeze_node_callback(struct ctdb_client_control_state *state)
130 {
131         struct freeze_node_data *fndata = talloc_get_type(state->async.private, struct freeze_node_data);
132
133
134         /* one more node has responded to our freeze node*/
135         fndata->count--;
136
137         /* if we failed to freeze the node, we must trigger another recovery */
138         if ( (state->state != CTDB_CONTROL_DONE) || (state->status != 0) ) {
139                 DEBUG(0, (__location__ " Failed to freeze node:%u. recovery failed\n", state->c->hdr.destnode));
140                 fndata->status = MONITOR_RECOVERY_NEEDED;
141         }
142
143         return;
144 }
145
146
147
148 /* freeze all nodes */
149 static enum monitor_result freeze_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
150 {
151         struct freeze_node_data *fndata;
152         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
153         struct ctdb_client_control_state *state;
154         enum monitor_result status;
155         int j;
156         
157         fndata = talloc(mem_ctx, struct freeze_node_data);
158         CTDB_NO_MEMORY_FATAL(ctdb, fndata);
159         fndata->count  = 0;
160         fndata->status = MONITOR_OK;
161
162         /* loop over all active nodes and send an async freeze call to 
163            them*/
164         for (j=0; j<nodemap->num; j++) {
165                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
166                         continue;
167                 }
168                 state = ctdb_ctrl_freeze_send(ctdb, mem_ctx, 
169                                         CONTROL_TIMEOUT(), 
170                                         nodemap->nodes[j].pnn);
171                 if (state == NULL) {
172                         /* we failed to send the control, treat this as 
173                            an error and try again next iteration
174                         */                      
175                         DEBUG(0,("Failed to call ctdb_ctrl_freeze_send during recovery\n"));
176                         talloc_free(mem_ctx);
177                         return MONITOR_RECOVERY_NEEDED;
178                 }
179
180                 /* set up the callback functions */
181                 state->async.fn = freeze_node_callback;
182                 state->async.private = fndata;
183
184                 /* one more control to wait for to complete */
185                 fndata->count++;
186         }
187
188
189         /* now wait for up to the maximum number of seconds allowed
190            or until all nodes we expect a response from has replied
191         */
192         while (fndata->count > 0) {
193                 event_loop_once(ctdb->ev);
194         }
195
196         status = fndata->status;
197         talloc_free(mem_ctx);
198         return status;
199 }
200
201
202 /*
203   change recovery mode on all nodes
204  */
205 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
206 {
207         int j, ret;
208
209         /* freeze all nodes */
210         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
211                 ret = freeze_all_nodes(ctdb, nodemap);
212                 if (ret != MONITOR_OK) {
213                         DEBUG(0, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
214                         return -1;
215                 }
216         }
217
218
219         /* set recovery mode to active on all nodes */
220         for (j=0; j<nodemap->num; j++) {
221                 /* dont change it for nodes that are unavailable */
222                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
223                         continue;
224                 }
225
226                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, rec_mode);
227                 if (ret != 0) {
228                         DEBUG(0, (__location__ " Unable to set recmode on node %u\n", nodemap->nodes[j].pnn));
229                         return -1;
230                 }
231
232                 if (rec_mode == CTDB_RECOVERY_NORMAL) {
233                         ret = ctdb_ctrl_thaw(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn);
234                         if (ret != 0) {
235                                 DEBUG(0, (__location__ " Unable to thaw node %u\n", nodemap->nodes[j].pnn));
236                                 return -1;
237                         }
238                 }
239         }
240
241         return 0;
242 }
243
244 /*
245   change recovery master on all node
246  */
247 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
248 {
249         int j, ret;
250
251         /* set recovery master to pnn on all nodes */
252         for (j=0; j<nodemap->num; j++) {
253                 /* dont change it for nodes that are unavailable */
254                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
255                         continue;
256                 }
257
258                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, pnn);
259                 if (ret != 0) {
260                         DEBUG(0, (__location__ " Unable to set recmaster on node %u\n", nodemap->nodes[j].pnn));
261                         return -1;
262                 }
263         }
264
265         return 0;
266 }
267
268
269 /*
270   ensure all other nodes have attached to any databases that we have
271  */
272 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
273                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
274 {
275         int i, j, db, ret;
276         struct ctdb_dbid_map *remote_dbmap;
277
278         /* verify that all other nodes have all our databases */
279         for (j=0; j<nodemap->num; j++) {
280                 /* we dont need to ourself ourselves */
281                 if (nodemap->nodes[j].pnn == pnn) {
282                         continue;
283                 }
284                 /* dont check nodes that are unavailable */
285                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
286                         continue;
287                 }
288
289                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
290                                          mem_ctx, &remote_dbmap);
291                 if (ret != 0) {
292                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
293                         return -1;
294                 }
295
296                 /* step through all local databases */
297                 for (db=0; db<dbmap->num;db++) {
298                         const char *name;
299
300
301                         for (i=0;i<remote_dbmap->num;i++) {
302                                 if (dbmap->dbids[db] == remote_dbmap->dbids[i]) {
303                                         break;
304                                 }
305                         }
306                         /* the remote node already have this database */
307                         if (i!=remote_dbmap->num) {
308                                 continue;
309                         }
310                         /* ok so we need to create this database */
311                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbids[db], mem_ctx, &name);
312                         if (ret != 0) {
313                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", pnn));
314                                 return -1;
315                         }
316                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, name);
317                         if (ret != 0) {
318                                 DEBUG(0, (__location__ " Unable to create remote db:%s\n", name));
319                                 return -1;
320                         }
321                 }
322         }
323
324         return 0;
325 }
326
327
328 /*
329   ensure we are attached to any databases that anyone else is attached to
330  */
331 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
332                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
333 {
334         int i, j, db, ret;
335         struct ctdb_dbid_map *remote_dbmap;
336
337         /* verify that we have all database any other node has */
338         for (j=0; j<nodemap->num; j++) {
339                 /* we dont need to ourself ourselves */
340                 if (nodemap->nodes[j].pnn == pnn) {
341                         continue;
342                 }
343                 /* dont check nodes that are unavailable */
344                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
345                         continue;
346                 }
347
348                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
349                                          mem_ctx, &remote_dbmap);
350                 if (ret != 0) {
351                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
352                         return -1;
353                 }
354
355                 /* step through all databases on the remote node */
356                 for (db=0; db<remote_dbmap->num;db++) {
357                         const char *name;
358
359                         for (i=0;i<(*dbmap)->num;i++) {
360                                 if (remote_dbmap->dbids[db] == (*dbmap)->dbids[i]) {
361                                         break;
362                                 }
363                         }
364                         /* we already have this db locally */
365                         if (i!=(*dbmap)->num) {
366                                 continue;
367                         }
368                         /* ok so we need to create this database and
369                            rebuild dbmap
370                          */
371                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
372                                             remote_dbmap->dbids[db], mem_ctx, &name);
373                         if (ret != 0) {
374                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", 
375                                           nodemap->nodes[j].pnn));
376                                 return -1;
377                         }
378                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name);
379                         if (ret != 0) {
380                                 DEBUG(0, (__location__ " Unable to create local db:%s\n", name));
381                                 return -1;
382                         }
383                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
384                         if (ret != 0) {
385                                 DEBUG(0, (__location__ " Unable to reread dbmap on node %u\n", pnn));
386                                 return -1;
387                         }
388                 }
389         }
390
391         return 0;
392 }
393
394
395 /*
396   pull all the remote database contents into ours
397  */
398 static int pull_all_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
399                                      uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
400 {
401         int i, j, ret;
402
403         /* pull all records from all other nodes across onto this node
404            (this merges based on rsn)
405         */
406         for (i=0;i<dbmap->num;i++) {
407                 for (j=0; j<nodemap->num; j++) {
408                         /* we dont need to merge with ourselves */
409                         if (nodemap->nodes[j].pnn == pnn) {
410                                 continue;
411                         }
412                         /* dont merge from nodes that are unavailable */
413                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
414                                 continue;
415                         }
416                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
417                                                pnn, dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
418                         if (ret != 0) {
419                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
420                                           nodemap->nodes[j].pnn, pnn));
421                                 return -1;
422                         }
423                 }
424         }
425
426         return 0;
427 }
428
429
430 /*
431   change the dmaster on all databases to point to us
432  */
433 static int update_dmaster_on_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
434                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
435 {
436         int i, j, ret;
437
438         /* update dmaster to point to this node for all databases/nodes */
439         for (i=0;i<dbmap->num;i++) {
440                 for (j=0; j<nodemap->num; j++) {
441                         /* dont repoint nodes that are unavailable */
442                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
443                                 continue;
444                         }
445                         ret = ctdb_ctrl_setdmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, ctdb, dbmap->dbids[i], pnn);
446                         if (ret != 0) {
447                                 DEBUG(0, (__location__ " Unable to set dmaster for node %u db:0x%08x\n", nodemap->nodes[j].pnn, dbmap->dbids[i]));
448                                 return -1;
449                         }
450                 }
451         }
452
453         return 0;
454 }
455
456
457 /*
458   update flags on all active nodes
459  */
460 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
461 {
462         int i;
463         for (i=0;i<nodemap->num;i++) {
464                 struct ctdb_node_flag_change c;
465                 TDB_DATA data;
466
467                 c.pnn = nodemap->nodes[i].pnn;
468                 c.old_flags = nodemap->nodes[i].flags;
469                 c.new_flags = nodemap->nodes[i].flags;
470
471                 data.dptr = (uint8_t *)&c;
472                 data.dsize = sizeof(c);
473
474                 ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
475                                   CTDB_SRVID_NODE_FLAGS_CHANGED, data);
476
477         }
478         return 0;
479 }
480
481 /*
482   vacuum one database
483  */
484 static int vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *nodemap)
485 {
486         uint64_t max_rsn;
487         int ret, i;
488
489         /* find max rsn on our local node for this db */
490         ret = ctdb_ctrl_get_max_rsn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, db_id, &max_rsn);
491         if (ret != 0) {
492                 return -1;
493         }
494
495         /* set rsn on non-empty records to max_rsn+1 */
496         for (i=0;i<nodemap->num;i++) {
497                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
498                         continue;
499                 }
500                 ret = ctdb_ctrl_set_rsn_nonempty(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
501                                                  db_id, max_rsn+1);
502                 if (ret != 0) {
503                         DEBUG(0,(__location__ " Failed to set rsn on node %u to %llu\n",
504                                  nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
505                         return -1;
506                 }
507         }
508
509         /* delete records with rsn < max_rsn+1 on all nodes */
510         for (i=0;i<nodemap->num;i++) {
511                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
512                         continue;
513                 }
514                 ret = ctdb_ctrl_delete_low_rsn(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
515                                                  db_id, max_rsn+1);
516                 if (ret != 0) {
517                         DEBUG(0,(__location__ " Failed to delete records on node %u with rsn below %llu\n",
518                                  nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
519                         return -1;
520                 }
521         }
522
523
524         return 0;
525 }
526
527
528 /*
529   vacuum all attached databases
530  */
531 static int vacuum_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
532                                 struct ctdb_dbid_map *dbmap)
533 {
534         int i;
535
536         /* update dmaster to point to this node for all databases/nodes */
537         for (i=0;i<dbmap->num;i++) {
538                 if (vacuum_db(ctdb, dbmap->dbids[i], nodemap) != 0) {
539                         return -1;
540                 }
541         }
542         return 0;
543 }
544
545
546 /*
547   push out all our database contents to all other nodes
548  */
549 static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
550                                     uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
551 {
552         int i, j, ret;
553
554         /* push all records out to the nodes again */
555         for (i=0;i<dbmap->num;i++) {
556                 for (j=0; j<nodemap->num; j++) {
557                         /* we dont need to push to ourselves */
558                         if (nodemap->nodes[j].pnn == pnn) {
559                                 continue;
560                         }
561                         /* dont push to nodes that are unavailable */
562                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
563                                 continue;
564                         }
565                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), pnn, nodemap->nodes[j].pnn, 
566                                                dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
567                         if (ret != 0) {
568                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
569                                           pnn, nodemap->nodes[j].pnn));
570                                 return -1;
571                         }
572                 }
573         }
574
575         return 0;
576 }
577
578
579 /*
580   ensure all nodes have the same vnnmap we do
581  */
582 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
583                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
584 {
585         int j, ret;
586
587         /* push the new vnn map out to all the nodes */
588         for (j=0; j<nodemap->num; j++) {
589                 /* dont push to nodes that are unavailable */
590                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
591                         continue;
592                 }
593
594                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
595                 if (ret != 0) {
596                         DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
597                         return -1;
598                 }
599         }
600
601         return 0;
602 }
603
604
605 /*
606   handler for when the admin bans a node
607 */
608 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
609                         TDB_DATA data, void *private_data)
610 {
611         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
612         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
613         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
614         uint32_t recmaster;
615         int ret;
616
617         if (data.dsize != sizeof(*b)) {
618                 DEBUG(0,("Bad data in ban_handler\n"));
619                 talloc_free(mem_ctx);
620                 return;
621         }
622
623         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
624         if (ret != 0) {
625                 DEBUG(0,(__location__ " Failed to find the recmaster\n"));
626                 talloc_free(mem_ctx);
627                 return;
628         }
629
630         if (recmaster != ctdb->pnn) {
631                 DEBUG(0,("We are not the recmaster - ignoring ban request\n"));
632                 talloc_free(mem_ctx);
633                 return;
634         }
635
636         DEBUG(0,("Node %u has been banned for %u seconds by the administrator\n", 
637                  b->pnn, b->ban_time));
638         ctdb_ban_node(rec, b->pnn, b->ban_time);
639         talloc_free(mem_ctx);
640 }
641
642 /*
643   handler for when the admin unbans a node
644 */
645 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
646                           TDB_DATA data, void *private_data)
647 {
648         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
649         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
650         uint32_t pnn;
651         int ret;
652         uint32_t recmaster;
653
654         if (data.dsize != sizeof(uint32_t)) {
655                 DEBUG(0,("Bad data in unban_handler\n"));
656                 talloc_free(mem_ctx);
657                 return;
658         }
659         pnn = *(uint32_t *)data.dptr;
660
661         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
662         if (ret != 0) {
663                 DEBUG(0,(__location__ " Failed to find the recmaster\n"));
664                 talloc_free(mem_ctx);
665                 return;
666         }
667
668         if (recmaster != ctdb->pnn) {
669                 DEBUG(0,("We are not the recmaster - ignoring unban request\n"));
670                 talloc_free(mem_ctx);
671                 return;
672         }
673
674         DEBUG(0,("Node %u has been unbanned by the administrator\n", pnn));
675         ctdb_unban_node(rec, pnn);
676         talloc_free(mem_ctx);
677 }
678
679
680
681 /*
682   called when ctdb_wait_timeout should finish
683  */
684 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
685                               struct timeval yt, void *p)
686 {
687         uint32_t *timed_out = (uint32_t *)p;
688         (*timed_out) = 1;
689 }
690
691 /*
692   wait for a given number of seconds
693  */
694 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
695 {
696         uint32_t timed_out = 0;
697         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
698         while (!timed_out) {
699                 event_loop_once(ctdb->ev);
700         }
701 }
702
703 /* Create a new random generation ip. 
704    The generation id can not be the INVALID_GENERATION id
705 */
706 static uint32_t new_generation(void)
707 {
708         uint32_t generation;
709
710         while (1) {
711                 generation = random();
712
713                 if (generation != INVALID_GENERATION) {
714                         break;
715                 }
716         }
717
718         return generation;
719 }
720                 
721 /*
722   we are the recmaster, and recovery is needed - start a recovery run
723  */
724 static int do_recovery(struct ctdb_recoverd *rec, 
725                        TALLOC_CTX *mem_ctx, uint32_t pnn, uint32_t num_active,
726                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
727                        uint32_t culprit)
728 {
729         struct ctdb_context *ctdb = rec->ctdb;
730         int i, j, ret;
731         uint32_t generation;
732         struct ctdb_dbid_map *dbmap;
733
734         if (rec->last_culprit != culprit ||
735             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
736                 /* either a new node is the culprit, or we've decide to forgive them */
737                 rec->last_culprit = culprit;
738                 rec->first_recover_time = timeval_current();
739                 rec->culprit_counter = 0;
740         }
741         rec->culprit_counter++;
742
743         if (rec->culprit_counter > 2*nodemap->num) {
744                 DEBUG(0,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
745                          culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
746                          ctdb->tunable.recovery_ban_period));
747                 ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
748         }
749
750         if (!ctdb_recovery_lock(ctdb, true)) {
751                 DEBUG(0,("Unable to get recovery lock - aborting recovery\n"));
752                 return -1;
753         }
754
755         /* set recovery mode to active on all nodes */
756         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
757         if (ret!=0) {
758                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
759                 return -1;
760         }
761
762         DEBUG(0, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
763
764         /* pick a new generation number */
765         generation = new_generation();
766
767         /* change the vnnmap on this node to use the new generation 
768            number but not on any other nodes.
769            this guarantees that if we abort the recovery prematurely
770            for some reason (a node stops responding?)
771            that we can just return immediately and we will reenter
772            recovery shortly again.
773            I.e. we deliberately leave the cluster with an inconsistent
774            generation id to allow us to abort recovery at any stage and
775            just restart it from scratch.
776          */
777         vnnmap->generation = generation;
778         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
779         if (ret != 0) {
780                 DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
781                 return -1;
782         }
783
784         /* get a list of all databases */
785         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
786         if (ret != 0) {
787                 DEBUG(0, (__location__ " Unable to get dbids from node :%u\n", pnn));
788                 return -1;
789         }
790
791
792
793         /* verify that all other nodes have all our databases */
794         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
795         if (ret != 0) {
796                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
797                 return -1;
798         }
799
800         /* verify that we have all the databases any other node has */
801         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
802         if (ret != 0) {
803                 DEBUG(0, (__location__ " Unable to create missing local databases\n"));
804                 return -1;
805         }
806
807
808
809         /* verify that all other nodes have all our databases */
810         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
811         if (ret != 0) {
812                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
813                 return -1;
814         }
815
816
817         DEBUG(1, (__location__ " Recovery - created remote databases\n"));
818
819         /* pull all remote databases onto the local node */
820         ret = pull_all_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
821         if (ret != 0) {
822                 DEBUG(0, (__location__ " Unable to pull remote databases\n"));
823                 return -1;
824         }
825
826         DEBUG(1, (__location__ " Recovery - pulled remote databases\n"));
827
828         /* push all local databases to the remote nodes */
829         ret = push_all_local_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
830         if (ret != 0) {
831                 DEBUG(0, (__location__ " Unable to push local databases\n"));
832                 return -1;
833         }
834
835         DEBUG(1, (__location__ " Recovery - pushed remote databases\n"));
836
837         /* build a new vnn map with all the currently active and
838            unbanned nodes */
839         generation = new_generation();
840         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
841         CTDB_NO_MEMORY(ctdb, vnnmap);
842         vnnmap->generation = generation;
843         vnnmap->size = num_active;
844         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
845         for (i=j=0;i<nodemap->num;i++) {
846                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
847                         vnnmap->map[j++] = nodemap->nodes[i].pnn;
848                 }
849         }
850
851
852
853         /* update to the new vnnmap on all nodes */
854         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
855         if (ret != 0) {
856                 DEBUG(0, (__location__ " Unable to update vnnmap on all nodes\n"));
857                 return -1;
858         }
859
860         DEBUG(1, (__location__ " Recovery - updated vnnmap\n"));
861
862         /* update recmaster to point to us for all nodes */
863         ret = set_recovery_master(ctdb, nodemap, pnn);
864         if (ret!=0) {
865                 DEBUG(0, (__location__ " Unable to set recovery master\n"));
866                 return -1;
867         }
868
869         DEBUG(1, (__location__ " Recovery - updated recmaster\n"));
870
871         /* repoint all local and remote database records to the local
872            node as being dmaster
873          */
874         ret = update_dmaster_on_all_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
875         if (ret != 0) {
876                 DEBUG(0, (__location__ " Unable to update dmaster on all databases\n"));
877                 return -1;
878         }
879
880         DEBUG(1, (__location__ " Recovery - updated dmaster on all databases\n"));
881
882         /*
883           update all nodes to have the same flags that we have
884          */
885         ret = update_flags_on_all_nodes(ctdb, nodemap);
886         if (ret != 0) {
887                 DEBUG(0, (__location__ " Unable to update flags on all nodes\n"));
888                 return -1;
889         }
890         
891         DEBUG(1, (__location__ " Recovery - updated flags\n"));
892
893         /*
894           run a vacuum operation on empty records
895          */
896         ret = vacuum_all_databases(ctdb, nodemap, dbmap);
897         if (ret != 0) {
898                 DEBUG(0, (__location__ " Unable to vacuum all databases\n"));
899                 return -1;
900         }
901
902         DEBUG(1, (__location__ " Recovery - vacuumed all databases\n"));
903
904         /*
905           if enabled, tell nodes to takeover their public IPs
906          */
907         if (ctdb->vnn) {
908                 rec->need_takeover_run = false;
909                 ret = ctdb_takeover_run(ctdb, nodemap);
910                 if (ret != 0) {
911                         DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
912                         rec->need_takeover_run = true;
913                         return -1;
914                 }
915                 DEBUG(1, (__location__ " Recovery - done takeover\n"));
916         }
917
918
919         /* disable recovery mode */
920         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
921         if (ret!=0) {
922                 DEBUG(0, (__location__ " Unable to set recovery mode to normal on cluster\n"));
923                 return -1;
924         }
925
926         /* send a message to all clients telling them that the cluster 
927            has been reconfigured */
928         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
929
930         DEBUG(0, (__location__ " Recovery complete\n"));
931
932         /* We just finished a recovery successfully. 
933            We now wait for rerecovery_timeout before we allow 
934            another recovery to take place.
935         */
936         DEBUG(0, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
937         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
938         DEBUG(0, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
939
940         return 0;
941 }
942
943
944 /*
945   elections are won by first checking the number of connected nodes, then
946   the priority time, then the pnn
947  */
948 struct election_message {
949         uint32_t num_connected;
950         struct timeval priority_time;
951         uint32_t pnn;
952 };
953
954 /*
955   form this nodes election data
956  */
957 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
958 {
959         int ret, i;
960         struct ctdb_node_map *nodemap;
961         struct ctdb_context *ctdb = rec->ctdb;
962
963         ZERO_STRUCTP(em);
964
965         em->pnn = rec->ctdb->pnn;
966         em->priority_time = rec->priority_time;
967
968         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
969         if (ret != 0) {
970                 return;
971         }
972
973         for (i=0;i<nodemap->num;i++) {
974                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
975                         em->num_connected++;
976                 }
977         }
978         talloc_free(nodemap);
979 }
980
981 /*
982   see if the given election data wins
983  */
984 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
985 {
986         struct election_message myem;
987         int cmp;
988
989         ctdb_election_data(rec, &myem);
990
991         /* try to use the most connected node */
992         cmp = (int)myem.num_connected - (int)em->num_connected;
993
994         /* then the longest running node */
995         if (cmp == 0) {
996                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
997         }
998
999         if (cmp == 0) {
1000                 cmp = (int)myem.pnn - (int)em->pnn;
1001         }
1002
1003         return cmp > 0;
1004 }
1005
1006 /*
1007   send out an election request
1008  */
1009 static int send_election_request(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn)
1010 {
1011         int ret;
1012         TDB_DATA election_data;
1013         struct election_message emsg;
1014         uint64_t srvid;
1015         struct ctdb_context *ctdb = rec->ctdb;
1016         
1017         srvid = CTDB_SRVID_RECOVERY;
1018
1019         ctdb_election_data(rec, &emsg);
1020
1021         election_data.dsize = sizeof(struct election_message);
1022         election_data.dptr  = (unsigned char *)&emsg;
1023
1024
1025         /* first we assume we will win the election and set 
1026            recoverymaster to be ourself on the current node
1027          */
1028         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1029         if (ret != 0) {
1030                 DEBUG(0, (__location__ " failed to send recmaster election request\n"));
1031                 return -1;
1032         }
1033
1034
1035         /* send an election message to all active nodes */
1036         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1037
1038         return 0;
1039 }
1040
1041 /*
1042   this function will unban all nodes in the cluster
1043 */
1044 static void unban_all_nodes(struct ctdb_context *ctdb)
1045 {
1046         int ret, i;
1047         struct ctdb_node_map *nodemap;
1048         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1049         
1050         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1051         if (ret != 0) {
1052                 DEBUG(0,(__location__ " failed to get nodemap to unban all nodes\n"));
1053                 return;
1054         }
1055
1056         for (i=0;i<nodemap->num;i++) {
1057                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1058                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1059                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1060                 }
1061         }
1062
1063         talloc_free(tmp_ctx);
1064 }
1065
1066 /*
1067   handler for recovery master elections
1068 */
1069 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1070                              TDB_DATA data, void *private_data)
1071 {
1072         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1073         int ret;
1074         struct election_message *em = (struct election_message *)data.dptr;
1075         TALLOC_CTX *mem_ctx;
1076
1077         mem_ctx = talloc_new(ctdb);
1078
1079         /* someone called an election. check their election data
1080            and if we disagree and we would rather be the elected node, 
1081            send a new election message to all other nodes
1082          */
1083         if (ctdb_election_win(rec, em)) {
1084                 ret = send_election_request(rec, mem_ctx, ctdb_get_pnn(ctdb));
1085                 if (ret!=0) {
1086                         DEBUG(0, (__location__ " failed to initiate recmaster election"));
1087                 }
1088                 talloc_free(mem_ctx);
1089                 /*unban_all_nodes(ctdb);*/
1090                 return;
1091         }
1092
1093         /* release the recmaster lock */
1094         if (em->pnn != ctdb->pnn &&
1095             ctdb->recovery_lock_fd != -1) {
1096                 close(ctdb->recovery_lock_fd);
1097                 ctdb->recovery_lock_fd = -1;
1098                 unban_all_nodes(ctdb);
1099         }
1100
1101         /* ok, let that guy become recmaster then */
1102         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1103         if (ret != 0) {
1104                 DEBUG(0, (__location__ " failed to send recmaster election request"));
1105                 talloc_free(mem_ctx);
1106                 return;
1107         }
1108
1109         /* release any bans */
1110         rec->last_culprit = (uint32_t)-1;
1111         talloc_free(rec->banned_nodes);
1112         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1113         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1114
1115         talloc_free(mem_ctx);
1116         return;
1117 }
1118
1119
1120 /*
1121   force the start of the election process
1122  */
1123 static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn, 
1124                            struct ctdb_node_map *nodemap)
1125 {
1126         int ret;
1127         struct ctdb_context *ctdb = rec->ctdb;
1128
1129         /* set all nodes to recovery mode to stop all internode traffic */
1130         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1131         if (ret!=0) {
1132                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
1133                 return;
1134         }
1135         
1136         ret = send_election_request(rec, mem_ctx, pnn);
1137         if (ret!=0) {
1138                 DEBUG(0, (__location__ " failed to initiate recmaster election"));
1139                 return;
1140         }
1141
1142         /* wait for a few seconds to collect all responses */
1143         ctdb_wait_timeout(ctdb, ctdb->tunable.election_timeout);
1144 }
1145
1146
1147
1148 /*
1149   handler for when a node changes its flags
1150 */
1151 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1152                             TDB_DATA data, void *private_data)
1153 {
1154         int ret;
1155         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1156         struct ctdb_node_map *nodemap=NULL;
1157         TALLOC_CTX *tmp_ctx;
1158         uint32_t changed_flags;
1159         int i;
1160         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1161
1162         if (data.dsize != sizeof(*c)) {
1163                 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1164                 return;
1165         }
1166
1167         tmp_ctx = talloc_new(ctdb);
1168         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1169
1170         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1171
1172         for (i=0;i<nodemap->num;i++) {
1173                 if (nodemap->nodes[i].pnn == c->pnn) break;
1174         }
1175
1176         if (i == nodemap->num) {
1177                 DEBUG(0,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1178                 talloc_free(tmp_ctx);
1179                 return;
1180         }
1181
1182         changed_flags = c->old_flags ^ c->new_flags;
1183
1184         /* Dont let messages from remote nodes change the DISCONNECTED flag. 
1185            This flag is handled locally based on whether the local node
1186            can communicate with the node or not.
1187         */
1188         c->new_flags &= ~NODE_FLAGS_DISCONNECTED;
1189         if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) {
1190                 c->new_flags |= NODE_FLAGS_DISCONNECTED;
1191         }
1192
1193         if (nodemap->nodes[i].flags != c->new_flags) {
1194                 DEBUG(0,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1195         }
1196
1197         nodemap->nodes[i].flags = c->new_flags;
1198
1199         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1200                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1201
1202         if (ret == 0) {
1203                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1204                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1205         }
1206         
1207         if (ret == 0 &&
1208             ctdb->recovery_master == ctdb->pnn &&
1209             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
1210             ctdb->vnn) {
1211                 /* Only do the takeover run if the perm disabled or unhealthy
1212                    flags changed since these will cause an ip failover but not
1213                    a recovery.
1214                    If the node became disconnected or banned this will also
1215                    lead to an ip address failover but that is handled 
1216                    during recovery
1217                 */
1218                 if (changed_flags & NODE_FLAGS_DISABLED) {
1219                         rec->need_takeover_run = true;
1220                 }
1221         }
1222
1223         talloc_free(tmp_ctx);
1224 }
1225
1226
1227
1228 struct verify_recmode_normal_data {
1229         uint32_t count;
1230         enum monitor_result status;
1231 };
1232
1233 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1234 {
1235         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private, struct verify_recmode_normal_data);
1236
1237
1238         /* one more node has responded with recmode data*/
1239         rmdata->count--;
1240
1241         /* if we failed to get the recmode, then return an error and let
1242            the main loop try again.
1243         */
1244         if (state->state != CTDB_CONTROL_DONE) {
1245                 if (rmdata->status == MONITOR_OK) {
1246                         rmdata->status = MONITOR_FAILED;
1247                 }
1248                 return;
1249         }
1250
1251         /* if we got a response, then the recmode will be stored in the
1252            status field
1253         */
1254         if (state->status != CTDB_RECOVERY_NORMAL) {
1255                 DEBUG(0, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
1256                 rmdata->status = MONITOR_RECOVERY_NEEDED;
1257         }
1258
1259         return;
1260 }
1261
1262
1263 /* verify that all nodes are in normal recovery mode */
1264 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
1265 {
1266         struct verify_recmode_normal_data *rmdata;
1267         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1268         struct ctdb_client_control_state *state;
1269         enum monitor_result status;
1270         int j;
1271         
1272         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
1273         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1274         rmdata->count  = 0;
1275         rmdata->status = MONITOR_OK;
1276
1277         /* loop over all active nodes and send an async getrecmode call to 
1278            them*/
1279         for (j=0; j<nodemap->num; j++) {
1280                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1281                         continue;
1282                 }
1283                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
1284                                         CONTROL_TIMEOUT(), 
1285                                         nodemap->nodes[j].pnn);
1286                 if (state == NULL) {
1287                         /* we failed to send the control, treat this as 
1288                            an error and try again next iteration
1289                         */                      
1290                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
1291                         talloc_free(mem_ctx);
1292                         return MONITOR_FAILED;
1293                 }
1294
1295                 /* set up the callback functions */
1296                 state->async.fn = verify_recmode_normal_callback;
1297                 state->async.private = rmdata;
1298
1299                 /* one more control to wait for to complete */
1300                 rmdata->count++;
1301         }
1302
1303
1304         /* now wait for up to the maximum number of seconds allowed
1305            or until all nodes we expect a response from has replied
1306         */
1307         while (rmdata->count > 0) {
1308                 event_loop_once(ctdb->ev);
1309         }
1310
1311         status = rmdata->status;
1312         talloc_free(mem_ctx);
1313         return status;
1314 }
1315
1316
1317 struct verify_recmaster_data {
1318         uint32_t count;
1319         uint32_t pnn;
1320         enum monitor_result status;
1321 };
1322
1323 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
1324 {
1325         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private, struct verify_recmaster_data);
1326
1327
1328         /* one more node has responded with recmaster data*/
1329         rmdata->count--;
1330
1331         /* if we failed to get the recmaster, then return an error and let
1332            the main loop try again.
1333         */
1334         if (state->state != CTDB_CONTROL_DONE) {
1335                 if (rmdata->status == MONITOR_OK) {
1336                         rmdata->status = MONITOR_FAILED;
1337                 }
1338                 return;
1339         }
1340
1341         /* if we got a response, then the recmaster will be stored in the
1342            status field
1343         */
1344         if (state->status != rmdata->pnn) {
1345                 DEBUG(0,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
1346                 rmdata->status = MONITOR_ELECTION_NEEDED;
1347         }
1348
1349         return;
1350 }
1351
1352
1353 /* verify that all nodes agree that we are the recmaster */
1354 static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
1355 {
1356         struct verify_recmaster_data *rmdata;
1357         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1358         struct ctdb_client_control_state *state;
1359         enum monitor_result status;
1360         int j;
1361         
1362         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
1363         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1364         rmdata->count  = 0;
1365         rmdata->pnn    = pnn;
1366         rmdata->status = MONITOR_OK;
1367
1368         /* loop over all active nodes and send an async getrecmaster call to 
1369            them*/
1370         for (j=0; j<nodemap->num; j++) {
1371                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1372                         continue;
1373                 }
1374                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
1375                                         CONTROL_TIMEOUT(),
1376                                         nodemap->nodes[j].pnn);
1377                 if (state == NULL) {
1378                         /* we failed to send the control, treat this as 
1379                            an error and try again next iteration
1380                         */                      
1381                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
1382                         talloc_free(mem_ctx);
1383                         return MONITOR_FAILED;
1384                 }
1385
1386                 /* set up the callback functions */
1387                 state->async.fn = verify_recmaster_callback;
1388                 state->async.private = rmdata;
1389
1390                 /* one more control to wait for to complete */
1391                 rmdata->count++;
1392         }
1393
1394
1395         /* now wait for up to the maximum number of seconds allowed
1396            or until all nodes we expect a response from has replied
1397         */
1398         while (rmdata->count > 0) {
1399                 event_loop_once(ctdb->ev);
1400         }
1401
1402         status = rmdata->status;
1403         talloc_free(mem_ctx);
1404         return status;
1405 }
1406
1407
1408 /*
1409   the main monitoring loop
1410  */
1411 static void monitor_cluster(struct ctdb_context *ctdb)
1412 {
1413         uint32_t pnn, num_active, recmaster;
1414         TALLOC_CTX *mem_ctx=NULL;
1415         struct ctdb_node_map *nodemap=NULL;
1416         struct ctdb_node_map *remote_nodemap=NULL;
1417         struct ctdb_vnn_map *vnnmap=NULL;
1418         struct ctdb_vnn_map *remote_vnnmap=NULL;
1419         int i, j, ret;
1420         struct ctdb_recoverd *rec;
1421
1422         rec = talloc_zero(ctdb, struct ctdb_recoverd);
1423         CTDB_NO_MEMORY_FATAL(ctdb, rec);
1424
1425         rec->ctdb = ctdb;
1426         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1427         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1428
1429         rec->priority_time = timeval_current();
1430
1431         /* register a message port for recovery elections */
1432         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
1433
1434         /* and one for when nodes are disabled/enabled */
1435         ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
1436
1437         /* and one for when nodes are banned */
1438         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
1439
1440         /* and one for when nodes are unbanned */
1441         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
1442         
1443 again:
1444         if (mem_ctx) {
1445                 talloc_free(mem_ctx);
1446                 mem_ctx = NULL;
1447         }
1448         mem_ctx = talloc_new(ctdb);
1449         if (!mem_ctx) {
1450                 DEBUG(0,("Failed to create temporary context\n"));
1451                 exit(-1);
1452         }
1453
1454         /* we only check for recovery once every second */
1455         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
1456
1457         /* get relevant tunables */
1458         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
1459         if (ret != 0) {
1460                 DEBUG(0,("Failed to get tunables - retrying\n"));
1461                 goto again;
1462         }
1463
1464         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1465         if (pnn == (uint32_t)-1) {
1466                 DEBUG(0,("Failed to get local pnn - retrying\n"));
1467                 goto again;
1468         }
1469
1470         /* get the vnnmap */
1471         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
1472         if (ret != 0) {
1473                 DEBUG(0, (__location__ " Unable to get vnnmap from node %u\n", pnn));
1474                 goto again;
1475         }
1476
1477
1478         /* get number of nodes */
1479         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &nodemap);
1480         if (ret != 0) {
1481                 DEBUG(0, (__location__ " Unable to get nodemap from node %u\n", pnn));
1482                 goto again;
1483         }
1484
1485
1486         /* count how many active nodes there are */
1487         num_active = 0;
1488         for (i=0; i<nodemap->num; i++) {
1489                 if (rec->banned_nodes[nodemap->nodes[i].pnn] != NULL) {
1490                         nodemap->nodes[i].flags |= NODE_FLAGS_BANNED;
1491                 } else {
1492                         nodemap->nodes[i].flags &= ~NODE_FLAGS_BANNED;
1493                 }
1494                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1495                         num_active++;
1496                 }
1497         }
1498
1499
1500         /* check which node is the recovery master */
1501         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &recmaster);
1502         if (ret != 0) {
1503                 DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", pnn));
1504                 goto again;
1505         }
1506
1507         if (recmaster == (uint32_t)-1) {
1508                 DEBUG(0,(__location__ " Initial recovery master set - forcing election\n"));
1509                 force_election(rec, mem_ctx, pnn, nodemap);
1510                 goto again;
1511         }
1512         
1513         /* verify that the recmaster node is still active */
1514         for (j=0; j<nodemap->num; j++) {
1515                 if (nodemap->nodes[j].pnn==recmaster) {
1516                         break;
1517                 }
1518         }
1519
1520         if (j == nodemap->num) {
1521                 DEBUG(0, ("Recmaster node %u not in list. Force reelection\n", recmaster));
1522                 force_election(rec, mem_ctx, pnn, nodemap);
1523                 goto again;
1524         }
1525
1526         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1527                 DEBUG(0, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
1528                 force_election(rec, mem_ctx, pnn, nodemap);
1529                 goto again;
1530         }
1531         
1532
1533         /* if we are not the recmaster then we do not need to check
1534            if recovery is needed
1535          */
1536         if (pnn != recmaster) {
1537                 goto again;
1538         }
1539
1540
1541         /* update the list of public ips that a node can handle for
1542            all connected nodes
1543         */
1544         for (j=0; j<nodemap->num; j++) {
1545                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1546                         continue;
1547                 }
1548                 /* release any existing data */
1549                 if (ctdb->nodes[j]->public_ips) {
1550                         talloc_free(ctdb->nodes[j]->public_ips);
1551                         ctdb->nodes[j]->public_ips = NULL;
1552                 }
1553                 /* grab a new shiny list of public ips from the node */
1554                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
1555                         ctdb->nodes[j]->pnn, 
1556                         ctdb->nodes,
1557                         &ctdb->nodes[j]->public_ips)) {
1558                         DEBUG(0,("Failed to read public ips from node : %u\n", 
1559                                 ctdb->nodes[j]->pnn));
1560                         goto again;
1561                 }
1562         }
1563
1564
1565         /* verify that all active nodes agree that we are the recmaster */
1566         switch (verify_recmaster(ctdb, nodemap, pnn)) {
1567         case MONITOR_RECOVERY_NEEDED:
1568                 /* can not happen */
1569                 goto again;
1570         case MONITOR_ELECTION_NEEDED:
1571                 force_election(rec, mem_ctx, pnn, nodemap);
1572                 goto again;
1573         case MONITOR_OK:
1574                 break;
1575         case MONITOR_FAILED:
1576                 goto again;
1577         }
1578
1579
1580         /* verify that all active nodes are in normal mode 
1581            and not in recovery mode 
1582          */
1583         switch (verify_recmode(ctdb, nodemap)) {
1584         case MONITOR_RECOVERY_NEEDED:
1585                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1586                 goto again;
1587         case MONITOR_FAILED:
1588                 goto again;
1589         case MONITOR_ELECTION_NEEDED:
1590                 /* can not happen */
1591         case MONITOR_OK:
1592                 break;
1593         }
1594
1595
1596
1597         /* get the nodemap for all active remote nodes and verify
1598            they are the same as for this node
1599          */
1600         for (j=0; j<nodemap->num; j++) {
1601                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1602                         continue;
1603                 }
1604                 if (nodemap->nodes[j].pnn == pnn) {
1605                         continue;
1606                 }
1607
1608                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1609                                            mem_ctx, &remote_nodemap);
1610                 if (ret != 0) {
1611                         DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
1612                                   nodemap->nodes[j].pnn));
1613                         goto again;
1614                 }
1615
1616                 /* if the nodes disagree on how many nodes there are
1617                    then this is a good reason to try recovery
1618                  */
1619                 if (remote_nodemap->num != nodemap->num) {
1620                         DEBUG(0, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
1621                                   nodemap->nodes[j].pnn, remote_nodemap->num, nodemap->num));
1622                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1623                         goto again;
1624                 }
1625
1626                 /* if the nodes disagree on which nodes exist and are
1627                    active, then that is also a good reason to do recovery
1628                  */
1629                 for (i=0;i<nodemap->num;i++) {
1630                         if (remote_nodemap->nodes[i].pnn != nodemap->nodes[i].pnn) {
1631                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
1632                                           nodemap->nodes[j].pnn, i, 
1633                                           remote_nodemap->nodes[i].pnn, nodemap->nodes[i].pnn));
1634                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1635                                             vnnmap, nodemap->nodes[j].pnn);
1636                                 goto again;
1637                         }
1638                         if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
1639                             (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1640                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
1641                                           nodemap->nodes[j].pnn, i,
1642                                           remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
1643                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1644                                             vnnmap, nodemap->nodes[j].pnn);
1645                                 goto again;
1646                         }
1647                 }
1648
1649                 /* update our nodemap flags according to the other
1650                    server - this gets the NODE_FLAGS_DISABLED
1651                    flag. Note that the remote node is authoritative
1652                    for its flags (except CONNECTED, which we know
1653                    matches in this code) */
1654                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1655                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1656                         rec->need_takeover_run = true;
1657                 }
1658         }
1659
1660
1661         /* there better be the same number of lmasters in the vnn map
1662            as there are active nodes or we will have to do a recovery
1663          */
1664         if (vnnmap->size != num_active) {
1665                 DEBUG(0, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
1666                           vnnmap->size, num_active));
1667                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
1668                 goto again;
1669         }
1670
1671         /* verify that all active nodes in the nodemap also exist in 
1672            the vnnmap.
1673          */
1674         for (j=0; j<nodemap->num; j++) {
1675                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1676                         continue;
1677                 }
1678                 if (nodemap->nodes[j].pnn == pnn) {
1679                         continue;
1680                 }
1681
1682                 for (i=0; i<vnnmap->size; i++) {
1683                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
1684                                 break;
1685                         }
1686                 }
1687                 if (i == vnnmap->size) {
1688                         DEBUG(0, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
1689                                   nodemap->nodes[j].pnn));
1690                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1691                         goto again;
1692                 }
1693         }
1694
1695         
1696         /* verify that all other nodes have the same vnnmap
1697            and are from the same generation
1698          */
1699         for (j=0; j<nodemap->num; j++) {
1700                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1701                         continue;
1702                 }
1703                 if (nodemap->nodes[j].pnn == pnn) {
1704                         continue;
1705                 }
1706
1707                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1708                                           mem_ctx, &remote_vnnmap);
1709                 if (ret != 0) {
1710                         DEBUG(0, (__location__ " Unable to get vnnmap from remote node %u\n", 
1711                                   nodemap->nodes[j].pnn));
1712                         goto again;
1713                 }
1714
1715                 /* verify the vnnmap generation is the same */
1716                 if (vnnmap->generation != remote_vnnmap->generation) {
1717                         DEBUG(0, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
1718                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
1719                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1720                         goto again;
1721                 }
1722
1723                 /* verify the vnnmap size is the same */
1724                 if (vnnmap->size != remote_vnnmap->size) {
1725                         DEBUG(0, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
1726                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
1727                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1728                         goto again;
1729                 }
1730
1731                 /* verify the vnnmap is the same */
1732                 for (i=0;i<vnnmap->size;i++) {
1733                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
1734                                 DEBUG(0, (__location__ " Remote node %u has different vnnmap.\n", 
1735                                           nodemap->nodes[j].pnn));
1736                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1737                                             vnnmap, nodemap->nodes[j].pnn);
1738                                 goto again;
1739                         }
1740                 }
1741         }
1742
1743         /* we might need to change who has what IP assigned */
1744         if (rec->need_takeover_run) {
1745                 rec->need_takeover_run = false;
1746                 ret = ctdb_takeover_run(ctdb, nodemap);
1747                 if (ret != 0) {
1748                         DEBUG(0, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
1749                         rec->need_takeover_run = true;
1750                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1751                                     vnnmap, nodemap->nodes[j].pnn);
1752                 }
1753         }
1754
1755         goto again;
1756
1757 }
1758
1759 /*
1760   event handler for when the main ctdbd dies
1761  */
1762 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
1763                                  uint16_t flags, void *private_data)
1764 {
1765         DEBUG(0,("recovery daemon parent died - exiting\n"));
1766         _exit(1);
1767 }
1768
1769
1770
1771 /*
1772   startup the recovery daemon as a child of the main ctdb daemon
1773  */
1774 int ctdb_start_recoverd(struct ctdb_context *ctdb)
1775 {
1776         int ret;
1777         int fd[2];
1778         pid_t child;
1779
1780         if (pipe(fd) != 0) {
1781                 return -1;
1782         }
1783
1784         child = fork();
1785         if (child == -1) {
1786                 return -1;
1787         }
1788         
1789         if (child != 0) {
1790                 close(fd[0]);
1791                 return 0;
1792         }
1793
1794         close(fd[1]);
1795
1796         /* shutdown the transport */
1797         ctdb->methods->shutdown(ctdb);
1798
1799         /* get a new event context */
1800         talloc_free(ctdb->ev);
1801         ctdb->ev = event_context_init(ctdb);
1802
1803         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
1804                      ctdb_recoverd_parent, &fd[0]);     
1805
1806         close(ctdb->daemon.sd);
1807         ctdb->daemon.sd = -1;
1808
1809         srandom(getpid() ^ time(NULL));
1810
1811         /* initialise ctdb */
1812         ret = ctdb_socket_connect(ctdb);
1813         if (ret != 0) {
1814                 DEBUG(0, (__location__ " Failed to init ctdb\n"));
1815                 exit(1);
1816         }
1817
1818         monitor_cluster(ctdb);
1819
1820         DEBUG(0,("ERROR: ctdb_recoverd finished!?\n"));
1821         return -1;
1822 }