merge from tridge
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb.h"
28 #include "../include/ctdb_private.h"
29
30
31 struct ban_state {
32         struct ctdb_recoverd *rec;
33         uint32_t banned_node;
34 };
35
36 /*
37   private state of recovery daemon
38  */
39 struct ctdb_recoverd {
40         struct ctdb_context *ctdb;
41         uint32_t last_culprit;
42         uint32_t culprit_counter;
43         struct timeval first_recover_time;
44         struct ban_state **banned_nodes;
45         struct timeval priority_time;
46         bool need_takeover_run;
47         bool need_recovery;
48         uint32_t node_flags;
49 };
50
51 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
52 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
53
54 /*
55   unban a node
56  */
57 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
58 {
59         struct ctdb_context *ctdb = rec->ctdb;
60
61         if (!ctdb_validate_pnn(ctdb, pnn)) {
62                 DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
63                 return;
64         }
65
66         if (rec->banned_nodes[pnn] == NULL) {
67                 return;
68         }
69
70         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
71
72         talloc_free(rec->banned_nodes[pnn]);
73         rec->banned_nodes[pnn] = NULL;
74 }
75
76
77 /*
78   called when a ban has timed out
79  */
80 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
81 {
82         struct ban_state *state = talloc_get_type(p, struct ban_state);
83         struct ctdb_recoverd *rec = state->rec;
84         uint32_t pnn = state->banned_node;
85
86         DEBUG(0,("Node %u is now unbanned\n", pnn));
87         ctdb_unban_node(rec, pnn);
88 }
89
90 /*
91   ban a node for a period of time
92  */
93 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
94 {
95         struct ctdb_context *ctdb = rec->ctdb;
96
97         if (!ctdb_validate_pnn(ctdb, pnn)) {
98                 DEBUG(0,("Bad pnn %u in ctdb_ban_node\n", pnn));
99                 return;
100         }
101
102         if (pnn == ctdb->pnn) {
103                 DEBUG(0,("self ban - lowering our election priority\n"));
104                 /* banning ourselves - lower our election priority */
105                 rec->priority_time = timeval_current();
106         }
107
108         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
109
110         rec->banned_nodes[pnn] = talloc(rec, struct ban_state);
111         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
112
113         rec->banned_nodes[pnn]->rec = rec;
114         rec->banned_nodes[pnn]->banned_node = pnn;
115
116         if (ban_time != 0) {
117                 event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
118                                 timeval_current_ofs(ban_time, 0),
119                                 ctdb_ban_timeout, rec->banned_nodes[pnn]);
120         }
121 }
122
123 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
124
125
126 struct freeze_node_data {
127         uint32_t count;
128         enum monitor_result status;
129 };
130
131
132 static void freeze_node_callback(struct ctdb_client_control_state *state)
133 {
134         struct freeze_node_data *fndata = talloc_get_type(state->async.private_data, struct freeze_node_data);
135
136
137         /* one more node has responded to our freeze node*/
138         fndata->count--;
139
140         /* if we failed to freeze the node, we must trigger another recovery */
141         if ( (state->state != CTDB_CONTROL_DONE) || (state->status != 0) ) {
142                 DEBUG(0, (__location__ " Failed to freeze node:%u. recovery failed\n", state->c->hdr.destnode));
143                 fndata->status = MONITOR_RECOVERY_NEEDED;
144         }
145
146         return;
147 }
148
149
150
151 /* freeze all nodes */
152 static enum monitor_result freeze_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
153 {
154         struct freeze_node_data *fndata;
155         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
156         struct ctdb_client_control_state *state;
157         enum monitor_result status;
158         int j;
159         
160         fndata = talloc(mem_ctx, struct freeze_node_data);
161         CTDB_NO_MEMORY_FATAL(ctdb, fndata);
162         fndata->count  = 0;
163         fndata->status = MONITOR_OK;
164
165         /* loop over all active nodes and send an async freeze call to 
166            them*/
167         for (j=0; j<nodemap->num; j++) {
168                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
169                         continue;
170                 }
171                 state = ctdb_ctrl_freeze_send(ctdb, mem_ctx, 
172                                         CONTROL_TIMEOUT(), 
173                                         nodemap->nodes[j].pnn);
174                 if (state == NULL) {
175                         /* we failed to send the control, treat this as 
176                            an error and try again next iteration
177                         */                      
178                         DEBUG(0,("Failed to call ctdb_ctrl_freeze_send during recovery\n"));
179                         talloc_free(mem_ctx);
180                         return MONITOR_RECOVERY_NEEDED;
181                 }
182
183                 /* set up the callback functions */
184                 state->async.fn = freeze_node_callback;
185                 state->async.private_data = fndata;
186
187                 /* one more control to wait for to complete */
188                 fndata->count++;
189         }
190
191
192         /* now wait for up to the maximum number of seconds allowed
193            or until all nodes we expect a response from has replied
194         */
195         while (fndata->count > 0) {
196                 event_loop_once(ctdb->ev);
197         }
198
199         status = fndata->status;
200         talloc_free(mem_ctx);
201         return status;
202 }
203
204
205 /*
206   change recovery mode on all nodes
207  */
208 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
209 {
210         int j, ret;
211
212         /* freeze all nodes */
213         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
214                 ret = freeze_all_nodes(ctdb, nodemap);
215                 if (ret != MONITOR_OK) {
216                         DEBUG(0, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
217                         return -1;
218                 }
219         }
220
221
222         /* set recovery mode to active on all nodes */
223         for (j=0; j<nodemap->num; j++) {
224                 /* dont change it for nodes that are unavailable */
225                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
226                         continue;
227                 }
228
229                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, rec_mode);
230                 if (ret != 0) {
231                         DEBUG(0, (__location__ " Unable to set recmode on node %u\n", nodemap->nodes[j].pnn));
232                         return -1;
233                 }
234
235                 if (rec_mode == CTDB_RECOVERY_NORMAL) {
236                         ret = ctdb_ctrl_thaw(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn);
237                         if (ret != 0) {
238                                 DEBUG(0, (__location__ " Unable to thaw node %u\n", nodemap->nodes[j].pnn));
239                                 return -1;
240                         }
241                 }
242         }
243
244         return 0;
245 }
246
247 /*
248   change recovery master on all node
249  */
250 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
251 {
252         int j, ret;
253
254         /* set recovery master to pnn on all nodes */
255         for (j=0; j<nodemap->num; j++) {
256                 /* dont change it for nodes that are unavailable */
257                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
258                         continue;
259                 }
260
261                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, pnn);
262                 if (ret != 0) {
263                         DEBUG(0, (__location__ " Unable to set recmaster on node %u\n", nodemap->nodes[j].pnn));
264                         return -1;
265                 }
266         }
267
268         return 0;
269 }
270
271
272 /*
273   ensure all other nodes have attached to any databases that we have
274  */
275 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
276                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
277 {
278         int i, j, db, ret;
279         struct ctdb_dbid_map *remote_dbmap;
280
281         /* verify that all other nodes have all our databases */
282         for (j=0; j<nodemap->num; j++) {
283                 /* we dont need to ourself ourselves */
284                 if (nodemap->nodes[j].pnn == pnn) {
285                         continue;
286                 }
287                 /* dont check nodes that are unavailable */
288                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
289                         continue;
290                 }
291
292                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
293                                          mem_ctx, &remote_dbmap);
294                 if (ret != 0) {
295                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
296                         return -1;
297                 }
298
299                 /* step through all local databases */
300                 for (db=0; db<dbmap->num;db++) {
301                         const char *name;
302
303
304                         for (i=0;i<remote_dbmap->num;i++) {
305                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
306                                         break;
307                                 }
308                         }
309                         /* the remote node already have this database */
310                         if (i!=remote_dbmap->num) {
311                                 continue;
312                         }
313                         /* ok so we need to create this database */
314                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
315                                             mem_ctx, &name);
316                         if (ret != 0) {
317                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", pnn));
318                                 return -1;
319                         }
320                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
321                                            mem_ctx, name, dbmap->dbs[db].persistent);
322                         if (ret != 0) {
323                                 DEBUG(0, (__location__ " Unable to create remote db:%s\n", name));
324                                 return -1;
325                         }
326                 }
327         }
328
329         return 0;
330 }
331
332
333 /*
334   ensure we are attached to any databases that anyone else is attached to
335  */
336 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
337                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
338 {
339         int i, j, db, ret;
340         struct ctdb_dbid_map *remote_dbmap;
341
342         /* verify that we have all database any other node has */
343         for (j=0; j<nodemap->num; j++) {
344                 /* we dont need to ourself ourselves */
345                 if (nodemap->nodes[j].pnn == pnn) {
346                         continue;
347                 }
348                 /* dont check nodes that are unavailable */
349                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
350                         continue;
351                 }
352
353                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
354                                          mem_ctx, &remote_dbmap);
355                 if (ret != 0) {
356                         DEBUG(0, (__location__ " Unable to get dbids from node %u\n", pnn));
357                         return -1;
358                 }
359
360                 /* step through all databases on the remote node */
361                 for (db=0; db<remote_dbmap->num;db++) {
362                         const char *name;
363
364                         for (i=0;i<(*dbmap)->num;i++) {
365                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
366                                         break;
367                                 }
368                         }
369                         /* we already have this db locally */
370                         if (i!=(*dbmap)->num) {
371                                 continue;
372                         }
373                         /* ok so we need to create this database and
374                            rebuild dbmap
375                          */
376                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
377                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
378                         if (ret != 0) {
379                                 DEBUG(0, (__location__ " Unable to get dbname from node %u\n", 
380                                           nodemap->nodes[j].pnn));
381                                 return -1;
382                         }
383                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
384                                            remote_dbmap->dbs[db].persistent);
385                         if (ret != 0) {
386                                 DEBUG(0, (__location__ " Unable to create local db:%s\n", name));
387                                 return -1;
388                         }
389                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
390                         if (ret != 0) {
391                                 DEBUG(0, (__location__ " Unable to reread dbmap on node %u\n", pnn));
392                                 return -1;
393                         }
394                 }
395         }
396
397         return 0;
398 }
399
400
401 /*
402   pull all the remote database contents into ours
403  */
404 static int pull_all_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
405                                      uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
406 {
407         int i, j, ret;
408
409         /* pull all records from all other nodes across onto this node
410            (this merges based on rsn)
411         */
412         for (i=0;i<dbmap->num;i++) {
413                 for (j=0; j<nodemap->num; j++) {
414                         /* we dont need to merge with ourselves */
415                         if (nodemap->nodes[j].pnn == pnn) {
416                                 continue;
417                         }
418                         /* dont merge from nodes that are unavailable */
419                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
420                                 continue;
421                         }
422                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
423                                                pnn, dbmap->dbs[i].dbid, CTDB_LMASTER_ANY, mem_ctx);
424                         if (ret != 0) {
425                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
426                                           nodemap->nodes[j].pnn, pnn));
427                                 return -1;
428                         }
429                 }
430         }
431
432         return 0;
433 }
434
435
436 /*
437   change the dmaster on all databases to point to us
438  */
439 static int update_dmaster_on_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
440                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
441 {
442         int i, j, ret;
443
444         /* update dmaster to point to this node for all databases/nodes */
445         for (i=0;i<dbmap->num;i++) {
446                 for (j=0; j<nodemap->num; j++) {
447                         /* dont repoint nodes that are unavailable */
448                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
449                                 continue;
450                         }
451                         ret = ctdb_ctrl_setdmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
452                                                    ctdb, dbmap->dbs[i].dbid, pnn);
453                         if (ret != 0) {
454                                 DEBUG(0, (__location__ " Unable to set dmaster for node %u db:0x%08x\n", 
455                                           nodemap->nodes[j].pnn, dbmap->dbs[i].dbid));
456                                 return -1;
457                         }
458                 }
459         }
460
461         return 0;
462 }
463
464
465 /*
466   update flags on all active nodes
467  */
468 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
469 {
470         int i;
471         for (i=0;i<nodemap->num;i++) {
472                 struct ctdb_node_flag_change c;
473                 TDB_DATA data;
474
475                 c.pnn = nodemap->nodes[i].pnn;
476                 c.old_flags = nodemap->nodes[i].flags;
477                 c.new_flags = nodemap->nodes[i].flags;
478
479                 data.dptr = (uint8_t *)&c;
480                 data.dsize = sizeof(c);
481
482                 ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
483                                   CTDB_SRVID_NODE_FLAGS_CHANGED, data);
484
485         }
486         return 0;
487 }
488
489 /*
490   vacuum one database
491  */
492 static int vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *nodemap)
493 {
494         uint64_t max_rsn;
495         int ret, i;
496
497         /* find max rsn on our local node for this db */
498         ret = ctdb_ctrl_get_max_rsn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, db_id, &max_rsn);
499         if (ret != 0) {
500                 return -1;
501         }
502
503         /* set rsn on non-empty records to max_rsn+1 */
504         for (i=0;i<nodemap->num;i++) {
505                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
506                         continue;
507                 }
508                 ret = ctdb_ctrl_set_rsn_nonempty(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
509                                                  db_id, max_rsn+1);
510                 if (ret != 0) {
511                         DEBUG(0,(__location__ " Failed to set rsn on node %u to %llu\n",
512                                  nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
513                         return -1;
514                 }
515         }
516
517         /* delete records with rsn < max_rsn+1 on all nodes */
518         for (i=0;i<nodemap->num;i++) {
519                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
520                         continue;
521                 }
522                 ret = ctdb_ctrl_delete_low_rsn(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn,
523                                                  db_id, max_rsn+1);
524                 if (ret != 0) {
525                         DEBUG(0,(__location__ " Failed to delete records on node %u with rsn below %llu\n",
526                                  nodemap->nodes[i].pnn, (unsigned long long)max_rsn+1));
527                         return -1;
528                 }
529         }
530
531
532         return 0;
533 }
534
535
536 /*
537   vacuum all attached databases
538  */
539 static int vacuum_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
540                                 struct ctdb_dbid_map *dbmap)
541 {
542         int i;
543
544         /* update dmaster to point to this node for all databases/nodes */
545         for (i=0;i<dbmap->num;i++) {
546                 if (vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap) != 0) {
547                         return -1;
548                 }
549         }
550         return 0;
551 }
552
553
554 /*
555   push out all our database contents to all other nodes
556  */
557 static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
558                                     uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
559 {
560         int i, j, ret;
561
562         /* push all records out to the nodes again */
563         for (i=0;i<dbmap->num;i++) {
564                 for (j=0; j<nodemap->num; j++) {
565                         /* we dont need to push to ourselves */
566                         if (nodemap->nodes[j].pnn == pnn) {
567                                 continue;
568                         }
569                         /* dont push to nodes that are unavailable */
570                         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
571                                 continue;
572                         }
573                         ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), pnn, nodemap->nodes[j].pnn, 
574                                                dbmap->dbs[i].dbid, CTDB_LMASTER_ANY, mem_ctx);
575                         if (ret != 0) {
576                                 DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
577                                           pnn, nodemap->nodes[j].pnn));
578                                 return -1;
579                         }
580                 }
581         }
582
583         return 0;
584 }
585
586
587 /*
588   ensure all nodes have the same vnnmap we do
589  */
590 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
591                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
592 {
593         int j, ret;
594
595         /* push the new vnn map out to all the nodes */
596         for (j=0; j<nodemap->num; j++) {
597                 /* dont push to nodes that are unavailable */
598                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
599                         continue;
600                 }
601
602                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
603                 if (ret != 0) {
604                         DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
605                         return -1;
606                 }
607         }
608
609         return 0;
610 }
611
612
613 /*
614   handler for when the admin bans a node
615 */
616 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
617                         TDB_DATA data, void *private_data)
618 {
619         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
620         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
621         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
622         uint32_t recmaster;
623         int ret;
624
625         if (data.dsize != sizeof(*b)) {
626                 DEBUG(0,("Bad data in ban_handler\n"));
627                 talloc_free(mem_ctx);
628                 return;
629         }
630
631         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
632         if (ret != 0) {
633                 DEBUG(0,(__location__ " Failed to find the recmaster\n"));
634                 talloc_free(mem_ctx);
635                 return;
636         }
637
638         if (recmaster != ctdb->pnn) {
639                 DEBUG(0,("We are not the recmaster - ignoring ban request\n"));
640                 talloc_free(mem_ctx);
641                 return;
642         }
643
644         DEBUG(0,("Node %u has been banned for %u seconds by the administrator\n", 
645                  b->pnn, b->ban_time));
646         ctdb_ban_node(rec, b->pnn, b->ban_time);
647         talloc_free(mem_ctx);
648 }
649
650 /*
651   handler for when the admin unbans a node
652 */
653 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
654                           TDB_DATA data, void *private_data)
655 {
656         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
657         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
658         uint32_t pnn;
659         int ret;
660         uint32_t recmaster;
661
662         if (data.dsize != sizeof(uint32_t)) {
663                 DEBUG(0,("Bad data in unban_handler\n"));
664                 talloc_free(mem_ctx);
665                 return;
666         }
667         pnn = *(uint32_t *)data.dptr;
668
669         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
670         if (ret != 0) {
671                 DEBUG(0,(__location__ " Failed to find the recmaster\n"));
672                 talloc_free(mem_ctx);
673                 return;
674         }
675
676         if (recmaster != ctdb->pnn) {
677                 DEBUG(0,("We are not the recmaster - ignoring unban request\n"));
678                 talloc_free(mem_ctx);
679                 return;
680         }
681
682         DEBUG(0,("Node %u has been unbanned by the administrator\n", pnn));
683         ctdb_unban_node(rec, pnn);
684         talloc_free(mem_ctx);
685 }
686
687
688
689 /*
690   called when ctdb_wait_timeout should finish
691  */
692 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
693                               struct timeval yt, void *p)
694 {
695         uint32_t *timed_out = (uint32_t *)p;
696         (*timed_out) = 1;
697 }
698
699 /*
700   wait for a given number of seconds
701  */
702 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
703 {
704         uint32_t timed_out = 0;
705         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
706         while (!timed_out) {
707                 event_loop_once(ctdb->ev);
708         }
709 }
710
711 /* Create a new random generation ip. 
712    The generation id can not be the INVALID_GENERATION id
713 */
714 static uint32_t new_generation(void)
715 {
716         uint32_t generation;
717
718         while (1) {
719                 generation = random();
720
721                 if (generation != INVALID_GENERATION) {
722                         break;
723                 }
724         }
725
726         return generation;
727 }
728
729 /*
730   remember the trouble maker
731  */
732 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
733 {
734         struct ctdb_context *ctdb = rec->ctdb;
735
736         if (rec->last_culprit != culprit ||
737             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
738                 DEBUG(0,("New recovery culprit %u\n", culprit));
739                 /* either a new node is the culprit, or we've decide to forgive them */
740                 rec->last_culprit = culprit;
741                 rec->first_recover_time = timeval_current();
742                 rec->culprit_counter = 0;
743         }
744         rec->culprit_counter++;
745 }
746                 
747 /*
748   we are the recmaster, and recovery is needed - start a recovery run
749  */
750 static int do_recovery(struct ctdb_recoverd *rec, 
751                        TALLOC_CTX *mem_ctx, uint32_t pnn, uint32_t num_active,
752                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
753                        uint32_t culprit)
754 {
755         struct ctdb_context *ctdb = rec->ctdb;
756         int i, j, ret;
757         uint32_t generation;
758         struct ctdb_dbid_map *dbmap;
759
760         /* if recovery fails, force it again */
761         rec->need_recovery = true;
762
763         ctdb_set_culprit(rec, culprit);
764
765         if (rec->culprit_counter > 2*nodemap->num) {
766                 DEBUG(0,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
767                          culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
768                          ctdb->tunable.recovery_ban_period));
769                 ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
770         }
771
772         if (!ctdb_recovery_lock(ctdb, true)) {
773                 ctdb_set_culprit(rec, pnn);
774                 DEBUG(0,("Unable to get recovery lock - aborting recovery\n"));
775                 return -1;
776         }
777
778         /* set recovery mode to active on all nodes */
779         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
780         if (ret!=0) {
781                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
782                 return -1;
783         }
784
785         DEBUG(0, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
786
787         /* pick a new generation number */
788         generation = new_generation();
789
790         /* change the vnnmap on this node to use the new generation 
791            number but not on any other nodes.
792            this guarantees that if we abort the recovery prematurely
793            for some reason (a node stops responding?)
794            that we can just return immediately and we will reenter
795            recovery shortly again.
796            I.e. we deliberately leave the cluster with an inconsistent
797            generation id to allow us to abort recovery at any stage and
798            just restart it from scratch.
799          */
800         vnnmap->generation = generation;
801         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
802         if (ret != 0) {
803                 DEBUG(0, (__location__ " Unable to set vnnmap for node %u\n", pnn));
804                 return -1;
805         }
806
807         /* get a list of all databases */
808         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
809         if (ret != 0) {
810                 DEBUG(0, (__location__ " Unable to get dbids from node :%u\n", pnn));
811                 return -1;
812         }
813
814
815
816         /* verify that all other nodes have all our databases */
817         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
818         if (ret != 0) {
819                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
820                 return -1;
821         }
822
823         /* verify that we have all the databases any other node has */
824         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
825         if (ret != 0) {
826                 DEBUG(0, (__location__ " Unable to create missing local databases\n"));
827                 return -1;
828         }
829
830
831
832         /* verify that all other nodes have all our databases */
833         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
834         if (ret != 0) {
835                 DEBUG(0, (__location__ " Unable to create missing remote databases\n"));
836                 return -1;
837         }
838
839
840         DEBUG(1, (__location__ " Recovery - created remote databases\n"));
841
842         /* pull all remote databases onto the local node */
843         ret = pull_all_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
844         if (ret != 0) {
845                 DEBUG(0, (__location__ " Unable to pull remote databases\n"));
846                 return -1;
847         }
848
849         DEBUG(1, (__location__ " Recovery - pulled remote databases\n"));
850
851         /* push all local databases to the remote nodes */
852         ret = push_all_local_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
853         if (ret != 0) {
854                 DEBUG(0, (__location__ " Unable to push local databases\n"));
855                 return -1;
856         }
857
858         DEBUG(1, (__location__ " Recovery - pushed remote databases\n"));
859
860         /* build a new vnn map with all the currently active and
861            unbanned nodes */
862         generation = new_generation();
863         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
864         CTDB_NO_MEMORY(ctdb, vnnmap);
865         vnnmap->generation = generation;
866         vnnmap->size = num_active;
867         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
868         for (i=j=0;i<nodemap->num;i++) {
869                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
870                         vnnmap->map[j++] = nodemap->nodes[i].pnn;
871                 }
872         }
873
874
875
876         /* update to the new vnnmap on all nodes */
877         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
878         if (ret != 0) {
879                 DEBUG(0, (__location__ " Unable to update vnnmap on all nodes\n"));
880                 return -1;
881         }
882
883         DEBUG(1, (__location__ " Recovery - updated vnnmap\n"));
884
885         /* update recmaster to point to us for all nodes */
886         ret = set_recovery_master(ctdb, nodemap, pnn);
887         if (ret!=0) {
888                 DEBUG(0, (__location__ " Unable to set recovery master\n"));
889                 return -1;
890         }
891
892         DEBUG(1, (__location__ " Recovery - updated recmaster\n"));
893
894         /* repoint all local and remote database records to the local
895            node as being dmaster
896          */
897         ret = update_dmaster_on_all_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
898         if (ret != 0) {
899                 DEBUG(0, (__location__ " Unable to update dmaster on all databases\n"));
900                 return -1;
901         }
902
903         DEBUG(1, (__location__ " Recovery - updated dmaster on all databases\n"));
904
905         /*
906           update all nodes to have the same flags that we have
907          */
908         ret = update_flags_on_all_nodes(ctdb, nodemap);
909         if (ret != 0) {
910                 DEBUG(0, (__location__ " Unable to update flags on all nodes\n"));
911                 return -1;
912         }
913         
914         DEBUG(1, (__location__ " Recovery - updated flags\n"));
915
916         /*
917           run a vacuum operation on empty records
918          */
919         ret = vacuum_all_databases(ctdb, nodemap, dbmap);
920         if (ret != 0) {
921                 DEBUG(0, (__location__ " Unable to vacuum all databases\n"));
922                 return -1;
923         }
924
925         DEBUG(1, (__location__ " Recovery - vacuumed all databases\n"));
926
927         /*
928           if enabled, tell nodes to takeover their public IPs
929          */
930         if (ctdb->vnn) {
931                 rec->need_takeover_run = false;
932                 ret = ctdb_takeover_run(ctdb, nodemap);
933                 if (ret != 0) {
934                         DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
935                         return -1;
936                 }
937                 DEBUG(1, (__location__ " Recovery - done takeover\n"));
938         }
939
940         for (i=0;i<dbmap->num;i++) {
941                 DEBUG(0,("Recovered database with db_id 0x%08x\n", dbmap->dbs[i].dbid));
942         }
943
944         /* disable recovery mode */
945         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
946         if (ret!=0) {
947                 DEBUG(0, (__location__ " Unable to set recovery mode to normal on cluster\n"));
948                 return -1;
949         }
950
951         /* send a message to all clients telling them that the cluster 
952            has been reconfigured */
953         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
954
955         DEBUG(0, (__location__ " Recovery complete\n"));
956
957         rec->need_recovery = false;
958
959         /* We just finished a recovery successfully. 
960            We now wait for rerecovery_timeout before we allow 
961            another recovery to take place.
962         */
963         DEBUG(0, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
964         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
965         DEBUG(0, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
966
967         return 0;
968 }
969
970
971 /*
972   elections are won by first checking the number of connected nodes, then
973   the priority time, then the pnn
974  */
975 struct election_message {
976         uint32_t num_connected;
977         struct timeval priority_time;
978         uint32_t pnn;
979         uint32_t node_flags;
980 };
981
982 /*
983   form this nodes election data
984  */
985 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
986 {
987         int ret, i;
988         struct ctdb_node_map *nodemap;
989         struct ctdb_context *ctdb = rec->ctdb;
990
991         ZERO_STRUCTP(em);
992
993         em->pnn = rec->ctdb->pnn;
994         em->priority_time = rec->priority_time;
995         em->node_flags = rec->node_flags;
996
997         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
998         if (ret != 0) {
999                 return;
1000         }
1001
1002         for (i=0;i<nodemap->num;i++) {
1003                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1004                         em->num_connected++;
1005                 }
1006         }
1007         talloc_free(nodemap);
1008 }
1009
1010 /*
1011   see if the given election data wins
1012  */
1013 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1014 {
1015         struct election_message myem;
1016         int cmp = 0;
1017
1018         ctdb_election_data(rec, &myem);
1019
1020         /* try to use a unbanned node */
1021         if ((em->node_flags & NODE_FLAGS_BANNED) &&
1022             !(myem.node_flags & NODE_FLAGS_BANNED)) {
1023                 cmp = 1;
1024         }
1025         if (!(em->node_flags & NODE_FLAGS_BANNED) &&
1026             (myem.node_flags & NODE_FLAGS_BANNED)) {
1027                 cmp = -1;
1028         }
1029
1030         /* try to use a healthy node */
1031         if (cmp == 0) {
1032                 if ((em->node_flags & NODE_FLAGS_UNHEALTHY) &&
1033                     !(myem.node_flags & NODE_FLAGS_UNHEALTHY)) {
1034                         cmp = 1;
1035                 }
1036                 if (!(em->node_flags & NODE_FLAGS_UNHEALTHY) &&
1037                     (myem.node_flags & NODE_FLAGS_UNHEALTHY)) {
1038                         cmp = -1;
1039                 }
1040         }
1041
1042         /* try to use the most connected node */
1043         if (cmp == 0) {
1044                 cmp = (int)myem.num_connected - (int)em->num_connected;
1045         }
1046
1047         /* then the longest running node */
1048         if (cmp == 0) {
1049                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1050         }
1051
1052         if (cmp == 0) {
1053                 cmp = (int)myem.pnn - (int)em->pnn;
1054         }
1055
1056         return cmp > 0;
1057 }
1058
1059 /*
1060   send out an election request
1061  */
1062 static int send_election_request(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn)
1063 {
1064         int ret;
1065         TDB_DATA election_data;
1066         struct election_message emsg;
1067         uint64_t srvid;
1068         struct ctdb_context *ctdb = rec->ctdb;
1069         
1070         srvid = CTDB_SRVID_RECOVERY;
1071
1072         ctdb_election_data(rec, &emsg);
1073
1074         election_data.dsize = sizeof(struct election_message);
1075         election_data.dptr  = (unsigned char *)&emsg;
1076
1077
1078         /* first we assume we will win the election and set 
1079            recoverymaster to be ourself on the current node
1080          */
1081         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1082         if (ret != 0) {
1083                 DEBUG(0, (__location__ " failed to send recmaster election request\n"));
1084                 return -1;
1085         }
1086
1087
1088         /* send an election message to all active nodes */
1089         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1090
1091         return 0;
1092 }
1093
1094 /*
1095   this function will unban all nodes in the cluster
1096 */
1097 static void unban_all_nodes(struct ctdb_context *ctdb)
1098 {
1099         int ret, i;
1100         struct ctdb_node_map *nodemap;
1101         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1102         
1103         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1104         if (ret != 0) {
1105                 DEBUG(0,(__location__ " failed to get nodemap to unban all nodes\n"));
1106                 return;
1107         }
1108
1109         for (i=0;i<nodemap->num;i++) {
1110                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1111                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1112                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1113                 }
1114         }
1115
1116         talloc_free(tmp_ctx);
1117 }
1118
1119 /*
1120   handler for recovery master elections
1121 */
1122 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1123                              TDB_DATA data, void *private_data)
1124 {
1125         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1126         int ret;
1127         struct election_message *em = (struct election_message *)data.dptr;
1128         TALLOC_CTX *mem_ctx;
1129
1130         mem_ctx = talloc_new(ctdb);
1131
1132         /* someone called an election. check their election data
1133            and if we disagree and we would rather be the elected node, 
1134            send a new election message to all other nodes
1135          */
1136         if (ctdb_election_win(rec, em)) {
1137                 ret = send_election_request(rec, mem_ctx, ctdb_get_pnn(ctdb));
1138                 if (ret!=0) {
1139                         DEBUG(0, (__location__ " failed to initiate recmaster election"));
1140                 }
1141                 talloc_free(mem_ctx);
1142                 /*unban_all_nodes(ctdb);*/
1143                 return;
1144         }
1145
1146         /* release the recmaster lock */
1147         if (em->pnn != ctdb->pnn &&
1148             ctdb->recovery_lock_fd != -1) {
1149                 close(ctdb->recovery_lock_fd);
1150                 ctdb->recovery_lock_fd = -1;
1151                 unban_all_nodes(ctdb);
1152         }
1153
1154         /* ok, let that guy become recmaster then */
1155         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1156         if (ret != 0) {
1157                 DEBUG(0, (__location__ " failed to send recmaster election request"));
1158                 talloc_free(mem_ctx);
1159                 return;
1160         }
1161
1162         /* release any bans */
1163         rec->last_culprit = (uint32_t)-1;
1164         talloc_free(rec->banned_nodes);
1165         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1166         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1167
1168         talloc_free(mem_ctx);
1169         return;
1170 }
1171
1172
1173 /*
1174   force the start of the election process
1175  */
1176 static void force_election(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx, uint32_t pnn, 
1177                            struct ctdb_node_map *nodemap)
1178 {
1179         int ret;
1180         struct ctdb_context *ctdb = rec->ctdb;
1181
1182         /* set all nodes to recovery mode to stop all internode traffic */
1183         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1184         if (ret!=0) {
1185                 DEBUG(0, (__location__ " Unable to set recovery mode to active on cluster\n"));
1186                 return;
1187         }
1188         
1189         ret = send_election_request(rec, mem_ctx, pnn);
1190         if (ret!=0) {
1191                 DEBUG(0, (__location__ " failed to initiate recmaster election"));
1192                 return;
1193         }
1194
1195         /* wait for a few seconds to collect all responses */
1196         ctdb_wait_timeout(ctdb, ctdb->tunable.election_timeout);
1197 }
1198
1199
1200
1201 /*
1202   handler for when a node changes its flags
1203 */
1204 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1205                             TDB_DATA data, void *private_data)
1206 {
1207         int ret;
1208         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1209         struct ctdb_node_map *nodemap=NULL;
1210         TALLOC_CTX *tmp_ctx;
1211         uint32_t changed_flags;
1212         int i;
1213         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1214
1215         if (data.dsize != sizeof(*c)) {
1216                 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1217                 return;
1218         }
1219
1220         tmp_ctx = talloc_new(ctdb);
1221         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1222
1223         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1224
1225         for (i=0;i<nodemap->num;i++) {
1226                 if (nodemap->nodes[i].pnn == c->pnn) break;
1227         }
1228
1229         if (i == nodemap->num) {
1230                 DEBUG(0,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1231                 talloc_free(tmp_ctx);
1232                 return;
1233         }
1234
1235         changed_flags = c->old_flags ^ c->new_flags;
1236
1237         /* Dont let messages from remote nodes change the DISCONNECTED flag. 
1238            This flag is handled locally based on whether the local node
1239            can communicate with the node or not.
1240         */
1241         c->new_flags &= ~NODE_FLAGS_DISCONNECTED;
1242         if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) {
1243                 c->new_flags |= NODE_FLAGS_DISCONNECTED;
1244         }
1245
1246         if (nodemap->nodes[i].flags != c->new_flags) {
1247                 DEBUG(0,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1248         }
1249
1250         nodemap->nodes[i].flags = c->new_flags;
1251
1252         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1253                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1254
1255         if (ret == 0) {
1256                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1257                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1258         }
1259         
1260         if (ret == 0 &&
1261             ctdb->recovery_master == ctdb->pnn &&
1262             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
1263             ctdb->vnn) {
1264                 /* Only do the takeover run if the perm disabled or unhealthy
1265                    flags changed since these will cause an ip failover but not
1266                    a recovery.
1267                    If the node became disconnected or banned this will also
1268                    lead to an ip address failover but that is handled 
1269                    during recovery
1270                 */
1271                 if (changed_flags & NODE_FLAGS_DISABLED) {
1272                         rec->need_takeover_run = true;
1273                 }
1274         }
1275
1276         talloc_free(tmp_ctx);
1277 }
1278
1279
1280
1281 struct verify_recmode_normal_data {
1282         uint32_t count;
1283         enum monitor_result status;
1284 };
1285
1286 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1287 {
1288         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
1289
1290
1291         /* one more node has responded with recmode data*/
1292         rmdata->count--;
1293
1294         /* if we failed to get the recmode, then return an error and let
1295            the main loop try again.
1296         */
1297         if (state->state != CTDB_CONTROL_DONE) {
1298                 if (rmdata->status == MONITOR_OK) {
1299                         rmdata->status = MONITOR_FAILED;
1300                 }
1301                 return;
1302         }
1303
1304         /* if we got a response, then the recmode will be stored in the
1305            status field
1306         */
1307         if (state->status != CTDB_RECOVERY_NORMAL) {
1308                 DEBUG(0, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
1309                 rmdata->status = MONITOR_RECOVERY_NEEDED;
1310         }
1311
1312         return;
1313 }
1314
1315
1316 /* verify that all nodes are in normal recovery mode */
1317 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
1318 {
1319         struct verify_recmode_normal_data *rmdata;
1320         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1321         struct ctdb_client_control_state *state;
1322         enum monitor_result status;
1323         int j;
1324         
1325         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
1326         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1327         rmdata->count  = 0;
1328         rmdata->status = MONITOR_OK;
1329
1330         /* loop over all active nodes and send an async getrecmode call to 
1331            them*/
1332         for (j=0; j<nodemap->num; j++) {
1333                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1334                         continue;
1335                 }
1336                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
1337                                         CONTROL_TIMEOUT(), 
1338                                         nodemap->nodes[j].pnn);
1339                 if (state == NULL) {
1340                         /* we failed to send the control, treat this as 
1341                            an error and try again next iteration
1342                         */                      
1343                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
1344                         talloc_free(mem_ctx);
1345                         return MONITOR_FAILED;
1346                 }
1347
1348                 /* set up the callback functions */
1349                 state->async.fn = verify_recmode_normal_callback;
1350                 state->async.private_data = rmdata;
1351
1352                 /* one more control to wait for to complete */
1353                 rmdata->count++;
1354         }
1355
1356
1357         /* now wait for up to the maximum number of seconds allowed
1358            or until all nodes we expect a response from has replied
1359         */
1360         while (rmdata->count > 0) {
1361                 event_loop_once(ctdb->ev);
1362         }
1363
1364         status = rmdata->status;
1365         talloc_free(mem_ctx);
1366         return status;
1367 }
1368
1369
1370 struct verify_recmaster_data {
1371         uint32_t count;
1372         uint32_t pnn;
1373         enum monitor_result status;
1374 };
1375
1376 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
1377 {
1378         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
1379
1380
1381         /* one more node has responded with recmaster data*/
1382         rmdata->count--;
1383
1384         /* if we failed to get the recmaster, then return an error and let
1385            the main loop try again.
1386         */
1387         if (state->state != CTDB_CONTROL_DONE) {
1388                 if (rmdata->status == MONITOR_OK) {
1389                         rmdata->status = MONITOR_FAILED;
1390                 }
1391                 return;
1392         }
1393
1394         /* if we got a response, then the recmaster will be stored in the
1395            status field
1396         */
1397         if (state->status != rmdata->pnn) {
1398                 DEBUG(0,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
1399                 rmdata->status = MONITOR_ELECTION_NEEDED;
1400         }
1401
1402         return;
1403 }
1404
1405
1406 /* verify that all nodes agree that we are the recmaster */
1407 static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
1408 {
1409         struct verify_recmaster_data *rmdata;
1410         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1411         struct ctdb_client_control_state *state;
1412         enum monitor_result status;
1413         int j;
1414         
1415         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
1416         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1417         rmdata->count  = 0;
1418         rmdata->pnn    = pnn;
1419         rmdata->status = MONITOR_OK;
1420
1421         /* loop over all active nodes and send an async getrecmaster call to 
1422            them*/
1423         for (j=0; j<nodemap->num; j++) {
1424                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1425                         continue;
1426                 }
1427                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
1428                                         CONTROL_TIMEOUT(),
1429                                         nodemap->nodes[j].pnn);
1430                 if (state == NULL) {
1431                         /* we failed to send the control, treat this as 
1432                            an error and try again next iteration
1433                         */                      
1434                         DEBUG(0,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
1435                         talloc_free(mem_ctx);
1436                         return MONITOR_FAILED;
1437                 }
1438
1439                 /* set up the callback functions */
1440                 state->async.fn = verify_recmaster_callback;
1441                 state->async.private_data = rmdata;
1442
1443                 /* one more control to wait for to complete */
1444                 rmdata->count++;
1445         }
1446
1447
1448         /* now wait for up to the maximum number of seconds allowed
1449            or until all nodes we expect a response from has replied
1450         */
1451         while (rmdata->count > 0) {
1452                 event_loop_once(ctdb->ev);
1453         }
1454
1455         status = rmdata->status;
1456         talloc_free(mem_ctx);
1457         return status;
1458 }
1459
1460
1461 /*
1462   the main monitoring loop
1463  */
1464 static void monitor_cluster(struct ctdb_context *ctdb)
1465 {
1466         uint32_t pnn, num_active, recmaster;
1467         TALLOC_CTX *mem_ctx=NULL;
1468         struct ctdb_node_map *nodemap=NULL;
1469         struct ctdb_node_map *remote_nodemap=NULL;
1470         struct ctdb_vnn_map *vnnmap=NULL;
1471         struct ctdb_vnn_map *remote_vnnmap=NULL;
1472         int i, j, ret;
1473         struct ctdb_recoverd *rec;
1474         struct ctdb_all_public_ips *ips;
1475         char c;
1476
1477         rec = talloc_zero(ctdb, struct ctdb_recoverd);
1478         CTDB_NO_MEMORY_FATAL(ctdb, rec);
1479
1480         rec->ctdb = ctdb;
1481         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1482         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1483
1484         rec->priority_time = timeval_current();
1485
1486         /* register a message port for recovery elections */
1487         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
1488
1489         /* and one for when nodes are disabled/enabled */
1490         ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
1491
1492         /* and one for when nodes are banned */
1493         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
1494
1495         /* and one for when nodes are unbanned */
1496         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
1497         
1498 again:
1499         if (mem_ctx) {
1500                 talloc_free(mem_ctx);
1501                 mem_ctx = NULL;
1502         }
1503         mem_ctx = talloc_new(ctdb);
1504         if (!mem_ctx) {
1505                 DEBUG(0,("Failed to create temporary context\n"));
1506                 exit(-1);
1507         }
1508
1509         /* we only check for recovery once every second */
1510         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
1511
1512         /* get relevant tunables */
1513         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
1514         if (ret != 0) {
1515                 DEBUG(0,("Failed to get tunables - retrying\n"));
1516                 goto again;
1517         }
1518
1519         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1520         if (pnn == (uint32_t)-1) {
1521                 DEBUG(0,("Failed to get local pnn - retrying\n"));
1522                 goto again;
1523         }
1524
1525         /* get the vnnmap */
1526         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
1527         if (ret != 0) {
1528                 DEBUG(0, (__location__ " Unable to get vnnmap from node %u\n", pnn));
1529                 goto again;
1530         }
1531
1532
1533         /* get number of nodes */
1534         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &nodemap);
1535         if (ret != 0) {
1536                 DEBUG(0, (__location__ " Unable to get nodemap from node %u\n", pnn));
1537                 goto again;
1538         }
1539
1540         /* remember our own node flags */
1541         rec->node_flags = nodemap->nodes[pnn].flags;
1542
1543         /* count how many active nodes there are */
1544         num_active = 0;
1545         for (i=0; i<nodemap->num; i++) {
1546                 if (rec->banned_nodes[nodemap->nodes[i].pnn] != NULL) {
1547                         nodemap->nodes[i].flags |= NODE_FLAGS_BANNED;
1548                 } else {
1549                         nodemap->nodes[i].flags &= ~NODE_FLAGS_BANNED;
1550                 }
1551                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1552                         num_active++;
1553                 }
1554         }
1555
1556
1557         /* check which node is the recovery master */
1558         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &recmaster);
1559         if (ret != 0) {
1560                 DEBUG(0, (__location__ " Unable to get recmaster from node %u\n", pnn));
1561                 goto again;
1562         }
1563
1564         if (recmaster == (uint32_t)-1) {
1565                 DEBUG(0,(__location__ " Initial recovery master set - forcing election\n"));
1566                 force_election(rec, mem_ctx, pnn, nodemap);
1567                 goto again;
1568         }
1569         
1570         /* verify that the recmaster node is still active */
1571         for (j=0; j<nodemap->num; j++) {
1572                 if (nodemap->nodes[j].pnn==recmaster) {
1573                         break;
1574                 }
1575         }
1576
1577         if (j == nodemap->num) {
1578                 DEBUG(0, ("Recmaster node %u not in list. Force reelection\n", recmaster));
1579                 force_election(rec, mem_ctx, pnn, nodemap);
1580                 goto again;
1581         }
1582
1583         if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1584                 DEBUG(0, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
1585                 force_election(rec, mem_ctx, pnn, nodemap);
1586                 goto again;
1587         }
1588
1589         /* verify that the public ip address allocation is consistent */
1590         if (ctdb->vnn != NULL) {
1591                 ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
1592                 if (ret != 0) {
1593                         DEBUG(0, ("Unable to get public ips from node %u\n", i));
1594                         goto again;
1595                 }
1596                 for (j=0; j<ips->num; j++) {
1597                         /* verify that we have the ip addresses we should have
1598                            and we dont have ones we shouldnt have.
1599                            if we find an inconsistency we set recmode to
1600                            active on the local node and wait for the recmaster
1601                            to do a full blown recovery
1602                         */
1603                         if (ips->ips[j].pnn == pnn) {
1604                                 if (!ctdb_sys_have_ip(ips->ips[j].sin)) {
1605                                         DEBUG(0,("Public address '%s' is missing and we should serve this ip\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
1606                                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1607                                         if (ret != 0) {
1608                                                 DEBUG(0,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
1609                                                 goto again;
1610                                         }
1611                                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
1612                                         if (ret != 0) {
1613                                                 DEBUG(0,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
1614                                                 goto again;
1615                                         }
1616                                 }
1617                         } else {
1618                                 if (ctdb_sys_have_ip(ips->ips[j].sin)) {
1619                                         DEBUG(0,("We are still serving a public address '%s' that we should not be serving.\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
1620                                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
1621                                         if (ret != 0) {
1622                                                 DEBUG(0,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
1623                                                 goto again;
1624                                         }
1625                                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
1626                                         if (ret != 0) {
1627                                                 DEBUG(0,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
1628                                                 goto again;
1629                                         }
1630                                 }
1631                         }
1632                 }
1633         }
1634
1635         /* if we are not the recmaster then we do not need to check
1636            if recovery is needed
1637          */
1638         if (pnn != recmaster) {
1639                 goto again;
1640         }
1641
1642         /* update the list of public ips that a node can handle for
1643            all connected nodes
1644         */
1645         for (j=0; j<nodemap->num; j++) {
1646                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1647                         continue;
1648                 }
1649                 /* release any existing data */
1650                 if (ctdb->nodes[j]->public_ips) {
1651                         talloc_free(ctdb->nodes[j]->public_ips);
1652                         ctdb->nodes[j]->public_ips = NULL;
1653                 }
1654                 /* grab a new shiny list of public ips from the node */
1655                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
1656                         ctdb->nodes[j]->pnn, 
1657                         ctdb->nodes,
1658                         &ctdb->nodes[j]->public_ips)) {
1659                         DEBUG(0,("Failed to read public ips from node : %u\n", 
1660                                 ctdb->nodes[j]->pnn));
1661                         goto again;
1662                 }
1663         }
1664
1665
1666         /* verify that all active nodes agree that we are the recmaster */
1667         switch (verify_recmaster(ctdb, nodemap, pnn)) {
1668         case MONITOR_RECOVERY_NEEDED:
1669                 /* can not happen */
1670                 goto again;
1671         case MONITOR_ELECTION_NEEDED:
1672                 force_election(rec, mem_ctx, pnn, nodemap);
1673                 goto again;
1674         case MONITOR_OK:
1675                 break;
1676         case MONITOR_FAILED:
1677                 goto again;
1678         }
1679
1680
1681         if (rec->need_recovery) {
1682                 /* a previous recovery didn't finish */
1683                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
1684                 goto again;             
1685         }
1686
1687         /* verify that all active nodes are in normal mode 
1688            and not in recovery mode 
1689          */
1690         switch (verify_recmode(ctdb, nodemap)) {
1691         case MONITOR_RECOVERY_NEEDED:
1692                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
1693                 goto again;
1694         case MONITOR_FAILED:
1695                 goto again;
1696         case MONITOR_ELECTION_NEEDED:
1697                 /* can not happen */
1698         case MONITOR_OK:
1699                 break;
1700         }
1701
1702
1703         /* we should have the reclock - check its not stale */
1704         if (ctdb->recovery_lock_fd == -1) {
1705                 DEBUG(0,("recovery master doesn't have the recovery lock\n"));
1706                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
1707                 goto again;
1708         }
1709
1710         if (read(ctdb->recovery_lock_fd, &c, 1) == -1) {
1711                 DEBUG(0,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
1712                 close(ctdb->recovery_lock_fd);
1713                 ctdb->recovery_lock_fd = -1;
1714                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
1715                 goto again;
1716         }
1717
1718         /* get the nodemap for all active remote nodes and verify
1719            they are the same as for this node
1720          */
1721         for (j=0; j<nodemap->num; j++) {
1722                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1723                         continue;
1724                 }
1725                 if (nodemap->nodes[j].pnn == pnn) {
1726                         continue;
1727                 }
1728
1729                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1730                                            mem_ctx, &remote_nodemap);
1731                 if (ret != 0) {
1732                         DEBUG(0, (__location__ " Unable to get nodemap from remote node %u\n", 
1733                                   nodemap->nodes[j].pnn));
1734                         goto again;
1735                 }
1736
1737                 /* if the nodes disagree on how many nodes there are
1738                    then this is a good reason to try recovery
1739                  */
1740                 if (remote_nodemap->num != nodemap->num) {
1741                         DEBUG(0, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
1742                                   nodemap->nodes[j].pnn, remote_nodemap->num, nodemap->num));
1743                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1744                         goto again;
1745                 }
1746
1747                 /* if the nodes disagree on which nodes exist and are
1748                    active, then that is also a good reason to do recovery
1749                  */
1750                 for (i=0;i<nodemap->num;i++) {
1751                         if (remote_nodemap->nodes[i].pnn != nodemap->nodes[i].pnn) {
1752                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
1753                                           nodemap->nodes[j].pnn, i, 
1754                                           remote_nodemap->nodes[i].pnn, nodemap->nodes[i].pnn));
1755                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1756                                             vnnmap, nodemap->nodes[j].pnn);
1757                                 goto again;
1758                         }
1759                         if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
1760                             (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
1761                                 DEBUG(0, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
1762                                           nodemap->nodes[j].pnn, i,
1763                                           remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
1764                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1765                                             vnnmap, nodemap->nodes[j].pnn);
1766                                 goto again;
1767                         }
1768                 }
1769
1770                 /* update our nodemap flags according to the other
1771                    server - this gets the NODE_FLAGS_DISABLED
1772                    flag. Note that the remote node is authoritative
1773                    for its flags (except CONNECTED, which we know
1774                    matches in this code) */
1775                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1776                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1777                         rec->need_takeover_run = true;
1778                 }
1779         }
1780
1781
1782         /* there better be the same number of lmasters in the vnn map
1783            as there are active nodes or we will have to do a recovery
1784          */
1785         if (vnnmap->size != num_active) {
1786                 DEBUG(0, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
1787                           vnnmap->size, num_active));
1788                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, ctdb->pnn);
1789                 goto again;
1790         }
1791
1792         /* verify that all active nodes in the nodemap also exist in 
1793            the vnnmap.
1794          */
1795         for (j=0; j<nodemap->num; j++) {
1796                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1797                         continue;
1798                 }
1799                 if (nodemap->nodes[j].pnn == pnn) {
1800                         continue;
1801                 }
1802
1803                 for (i=0; i<vnnmap->size; i++) {
1804                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
1805                                 break;
1806                         }
1807                 }
1808                 if (i == vnnmap->size) {
1809                         DEBUG(0, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
1810                                   nodemap->nodes[j].pnn));
1811                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1812                         goto again;
1813                 }
1814         }
1815
1816         
1817         /* verify that all other nodes have the same vnnmap
1818            and are from the same generation
1819          */
1820         for (j=0; j<nodemap->num; j++) {
1821                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1822                         continue;
1823                 }
1824                 if (nodemap->nodes[j].pnn == pnn) {
1825                         continue;
1826                 }
1827
1828                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1829                                           mem_ctx, &remote_vnnmap);
1830                 if (ret != 0) {
1831                         DEBUG(0, (__location__ " Unable to get vnnmap from remote node %u\n", 
1832                                   nodemap->nodes[j].pnn));
1833                         goto again;
1834                 }
1835
1836                 /* verify the vnnmap generation is the same */
1837                 if (vnnmap->generation != remote_vnnmap->generation) {
1838                         DEBUG(0, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
1839                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
1840                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1841                         goto again;
1842                 }
1843
1844                 /* verify the vnnmap size is the same */
1845                 if (vnnmap->size != remote_vnnmap->size) {
1846                         DEBUG(0, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
1847                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
1848                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, vnnmap, nodemap->nodes[j].pnn);
1849                         goto again;
1850                 }
1851
1852                 /* verify the vnnmap is the same */
1853                 for (i=0;i<vnnmap->size;i++) {
1854                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
1855                                 DEBUG(0, (__location__ " Remote node %u has different vnnmap.\n", 
1856                                           nodemap->nodes[j].pnn));
1857                                 do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1858                                             vnnmap, nodemap->nodes[j].pnn);
1859                                 goto again;
1860                         }
1861                 }
1862         }
1863
1864         /* we might need to change who has what IP assigned */
1865         if (rec->need_takeover_run) {
1866                 rec->need_takeover_run = false;
1867                 ret = ctdb_takeover_run(ctdb, nodemap);
1868                 if (ret != 0) {
1869                         DEBUG(0, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
1870                         do_recovery(rec, mem_ctx, pnn, num_active, nodemap, 
1871                                     vnnmap, ctdb->pnn);
1872                 }
1873         }
1874
1875         goto again;
1876
1877 }
1878
1879 /*
1880   event handler for when the main ctdbd dies
1881  */
1882 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
1883                                  uint16_t flags, void *private_data)
1884 {
1885         DEBUG(0,("recovery daemon parent died - exiting\n"));
1886         _exit(1);
1887 }
1888
1889 /*
1890   startup the recovery daemon as a child of the main ctdb daemon
1891  */
1892 int ctdb_start_recoverd(struct ctdb_context *ctdb)
1893 {
1894         int ret;
1895         int fd[2];
1896         pid_t child;
1897
1898         if (pipe(fd) != 0) {
1899                 return -1;
1900         }
1901
1902         child = fork();
1903         if (child == -1) {
1904                 return -1;
1905         }
1906         
1907         if (child != 0) {
1908                 close(fd[0]);
1909                 return 0;
1910         }
1911
1912         close(fd[1]);
1913
1914         /* shutdown the transport */
1915         ctdb->methods->shutdown(ctdb);
1916
1917         /* get a new event context */
1918         talloc_free(ctdb->ev);
1919         ctdb->ev = event_context_init(ctdb);
1920
1921         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
1922                      ctdb_recoverd_parent, &fd[0]);     
1923
1924         close(ctdb->daemon.sd);
1925         ctdb->daemon.sd = -1;
1926
1927         srandom(getpid() ^ time(NULL));
1928
1929         /* initialise ctdb */
1930         ret = ctdb_socket_connect(ctdb);
1931         if (ret != 0) {
1932                 DEBUG(0, (__location__ " Failed to init ctdb\n"));
1933                 exit(1);
1934         }
1935
1936         monitor_cluster(ctdb);
1937
1938         DEBUG(0,("ERROR: ctdb_recoverd finished!?\n"));
1939         return -1;
1940 }