2db9109532d60a06c76da735de740125bf9a1ec9
[amitay/samba.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
74
75 /*
76   ban a node for a period of time
77  */
78 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
79 {
80         int ret;
81         struct ctdb_context *ctdb = rec->ctdb;
82         struct ctdb_ban_time bantime;
83        
84         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
85
86         if (!ctdb_validate_pnn(ctdb, pnn)) {
87                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
88                 return;
89         }
90
91         bantime.pnn  = pnn;
92         bantime.time = ban_time;
93
94         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
95         if (ret != 0) {
96                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
97                 return;
98         }
99
100 }
101
102 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
103
104
105 /*
106   run the "recovered" eventscript on all nodes
107  */
108 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
109 {
110         TALLOC_CTX *tmp_ctx;
111         uint32_t *nodes;
112
113         tmp_ctx = talloc_new(ctdb);
114         CTDB_NO_MEMORY(ctdb, tmp_ctx);
115
116         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
117         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
118                                         nodes, 0,
119                                         CONTROL_TIMEOUT(), false, tdb_null,
120                                         NULL, NULL,
121                                         NULL) != 0) {
122                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
123
124                 talloc_free(tmp_ctx);
125                 return -1;
126         }
127
128         talloc_free(tmp_ctx);
129         return 0;
130 }
131
132 /*
133   remember the trouble maker
134  */
135 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
136 {
137         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
138         struct ctdb_banning_state *ban_state;
139
140         if (culprit > ctdb->num_nodes) {
141                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
142                 return;
143         }
144
145         if (ctdb->nodes[culprit]->ban_state == NULL) {
146                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
147                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
148
149                 
150         }
151         ban_state = ctdb->nodes[culprit]->ban_state;
152         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
153                 /* this was the first time in a long while this node
154                    misbehaved so we will forgive any old transgressions.
155                 */
156                 ban_state->count = 0;
157         }
158
159         ban_state->count += count;
160         ban_state->last_reported_time = timeval_current();
161         rec->last_culprit_node = culprit;
162 }
163
164 /*
165   remember the trouble maker
166  */
167 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
168 {
169         ctdb_set_culprit_count(rec, culprit, 1);
170 }
171
172
173 /* this callback is called for every node that failed to execute the
174    start recovery event
175 */
176 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
177 {
178         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
179
180         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
181
182         ctdb_set_culprit(rec, node_pnn);
183 }
184
185 /*
186   run the "startrecovery" eventscript on all nodes
187  */
188 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
189 {
190         TALLOC_CTX *tmp_ctx;
191         uint32_t *nodes;
192         struct ctdb_context *ctdb = rec->ctdb;
193
194         tmp_ctx = talloc_new(ctdb);
195         CTDB_NO_MEMORY(ctdb, tmp_ctx);
196
197         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
198         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
199                                         nodes, 0,
200                                         CONTROL_TIMEOUT(), false, tdb_null,
201                                         NULL,
202                                         startrecovery_fail_callback,
203                                         rec) != 0) {
204                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
205                 talloc_free(tmp_ctx);
206                 return -1;
207         }
208
209         talloc_free(tmp_ctx);
210         return 0;
211 }
212
213 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
214 {
215         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
216                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
217                 return;
218         }
219         if (node_pnn < ctdb->num_nodes) {
220                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
221         }
222 }
223
224 /*
225   update the node capabilities for all connected nodes
226  */
227 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
228 {
229         uint32_t *nodes;
230         TALLOC_CTX *tmp_ctx;
231
232         tmp_ctx = talloc_new(ctdb);
233         CTDB_NO_MEMORY(ctdb, tmp_ctx);
234
235         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
236         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
237                                         nodes, 0,
238                                         CONTROL_TIMEOUT(),
239                                         false, tdb_null,
240                                         async_getcap_callback, NULL,
241                                         NULL) != 0) {
242                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
243                 talloc_free(tmp_ctx);
244                 return -1;
245         }
246
247         talloc_free(tmp_ctx);
248         return 0;
249 }
250
251 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
252 {
253         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
254
255         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
256         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
257 }
258
259 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
260 {
261         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
262
263         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
264         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
265 }
266
267 /*
268   change recovery mode on all nodes
269  */
270 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
271 {
272         TDB_DATA data;
273         uint32_t *nodes;
274         TALLOC_CTX *tmp_ctx;
275
276         tmp_ctx = talloc_new(ctdb);
277         CTDB_NO_MEMORY(ctdb, tmp_ctx);
278
279         /* freeze all nodes */
280         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
281         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
282                 int i;
283
284                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
285                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
286                                                 nodes, i,
287                                                 CONTROL_TIMEOUT(),
288                                                 false, tdb_null,
289                                                 NULL,
290                                                 set_recmode_fail_callback,
291                                                 rec) != 0) {
292                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
293                                 talloc_free(tmp_ctx);
294                                 return -1;
295                         }
296                 }
297         }
298
299
300         data.dsize = sizeof(uint32_t);
301         data.dptr = (unsigned char *)&rec_mode;
302
303         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
304                                         nodes, 0,
305                                         CONTROL_TIMEOUT(),
306                                         false, data,
307                                         NULL, NULL,
308                                         NULL) != 0) {
309                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
310                 talloc_free(tmp_ctx);
311                 return -1;
312         }
313
314         talloc_free(tmp_ctx);
315         return 0;
316 }
317
318 /*
319   change recovery master on all node
320  */
321 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
322 {
323         TDB_DATA data;
324         TALLOC_CTX *tmp_ctx;
325         uint32_t *nodes;
326
327         tmp_ctx = talloc_new(ctdb);
328         CTDB_NO_MEMORY(ctdb, tmp_ctx);
329
330         data.dsize = sizeof(uint32_t);
331         data.dptr = (unsigned char *)&pnn;
332
333         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
334         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
335                                         nodes, 0,
336                                         CONTROL_TIMEOUT(), false, data,
337                                         NULL, NULL,
338                                         NULL) != 0) {
339                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
340                 talloc_free(tmp_ctx);
341                 return -1;
342         }
343
344         talloc_free(tmp_ctx);
345         return 0;
346 }
347
348 /* update all remote nodes to use the same db priority that we have
349    this can fail if the remove node has not yet been upgraded to 
350    support this function, so we always return success and never fail
351    a recovery if this call fails.
352 */
353 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
354         struct ctdb_node_map *nodemap, 
355         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
356 {
357         int db;
358         uint32_t *nodes;
359
360         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
361
362         /* step through all local databases */
363         for (db=0; db<dbmap->num;db++) {
364                 TDB_DATA data;
365                 struct ctdb_db_priority db_prio;
366                 int ret;
367
368                 db_prio.db_id     = dbmap->dbs[db].dbid;
369                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
370                 if (ret != 0) {
371                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
372                         continue;
373                 }
374
375                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
376
377                 data.dptr  = (uint8_t *)&db_prio;
378                 data.dsize = sizeof(db_prio);
379
380                 if (ctdb_client_async_control(ctdb,
381                                         CTDB_CONTROL_SET_DB_PRIORITY,
382                                         nodes, 0,
383                                         CONTROL_TIMEOUT(), false, data,
384                                         NULL, NULL,
385                                         NULL) != 0) {
386                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
387                 }
388         }
389
390         return 0;
391 }                       
392
393 /*
394   ensure all other nodes have attached to any databases that we have
395  */
396 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
397                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
398 {
399         int i, j, db, ret;
400         struct ctdb_dbid_map *remote_dbmap;
401
402         /* verify that all other nodes have all our databases */
403         for (j=0; j<nodemap->num; j++) {
404                 /* we dont need to ourself ourselves */
405                 if (nodemap->nodes[j].pnn == pnn) {
406                         continue;
407                 }
408                 /* dont check nodes that are unavailable */
409                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
410                         continue;
411                 }
412
413                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
414                                          mem_ctx, &remote_dbmap);
415                 if (ret != 0) {
416                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
417                         return -1;
418                 }
419
420                 /* step through all local databases */
421                 for (db=0; db<dbmap->num;db++) {
422                         const char *name;
423
424
425                         for (i=0;i<remote_dbmap->num;i++) {
426                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
427                                         break;
428                                 }
429                         }
430                         /* the remote node already have this database */
431                         if (i!=remote_dbmap->num) {
432                                 continue;
433                         }
434                         /* ok so we need to create this database */
435                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
436                                             mem_ctx, &name);
437                         if (ret != 0) {
438                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
439                                 return -1;
440                         }
441                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
442                                            mem_ctx, name, dbmap->dbs[db].persistent);
443                         if (ret != 0) {
444                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
445                                 return -1;
446                         }
447                 }
448         }
449
450         return 0;
451 }
452
453
454 /*
455   ensure we are attached to any databases that anyone else is attached to
456  */
457 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
458                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
459 {
460         int i, j, db, ret;
461         struct ctdb_dbid_map *remote_dbmap;
462
463         /* verify that we have all database any other node has */
464         for (j=0; j<nodemap->num; j++) {
465                 /* we dont need to ourself ourselves */
466                 if (nodemap->nodes[j].pnn == pnn) {
467                         continue;
468                 }
469                 /* dont check nodes that are unavailable */
470                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
471                         continue;
472                 }
473
474                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
475                                          mem_ctx, &remote_dbmap);
476                 if (ret != 0) {
477                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
478                         return -1;
479                 }
480
481                 /* step through all databases on the remote node */
482                 for (db=0; db<remote_dbmap->num;db++) {
483                         const char *name;
484
485                         for (i=0;i<(*dbmap)->num;i++) {
486                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
487                                         break;
488                                 }
489                         }
490                         /* we already have this db locally */
491                         if (i!=(*dbmap)->num) {
492                                 continue;
493                         }
494                         /* ok so we need to create this database and
495                            rebuild dbmap
496                          */
497                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
498                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
499                         if (ret != 0) {
500                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
501                                           nodemap->nodes[j].pnn));
502                                 return -1;
503                         }
504                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
505                                            remote_dbmap->dbs[db].persistent);
506                         if (ret != 0) {
507                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
508                                 return -1;
509                         }
510                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
511                         if (ret != 0) {
512                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
513                                 return -1;
514                         }
515                 }
516         }
517
518         return 0;
519 }
520
521
522 /*
523   pull the remote database contents from one node into the recdb
524  */
525 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
526                                     struct tdb_wrap *recdb, uint32_t dbid,
527                                     bool persistent)
528 {
529         int ret;
530         TDB_DATA outdata;
531         struct ctdb_marshall_buffer *reply;
532         struct ctdb_rec_data *rec;
533         int i;
534         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
535
536         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
537                                CONTROL_TIMEOUT(), &outdata);
538         if (ret != 0) {
539                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
540                 talloc_free(tmp_ctx);
541                 return -1;
542         }
543
544         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
545
546         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
547                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
548                 talloc_free(tmp_ctx);
549                 return -1;
550         }
551         
552         rec = (struct ctdb_rec_data *)&reply->data[0];
553         
554         for (i=0;
555              i<reply->count;
556              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
557                 TDB_DATA key, data;
558                 struct ctdb_ltdb_header *hdr;
559                 TDB_DATA existing;
560                 
561                 key.dptr = &rec->data[0];
562                 key.dsize = rec->keylen;
563                 data.dptr = &rec->data[key.dsize];
564                 data.dsize = rec->datalen;
565                 
566                 hdr = (struct ctdb_ltdb_header *)data.dptr;
567
568                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
569                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
570                         talloc_free(tmp_ctx);
571                         return -1;
572                 }
573
574                 /* fetch the existing record, if any */
575                 existing = tdb_fetch(recdb->tdb, key);
576                 
577                 if (existing.dptr != NULL) {
578                         struct ctdb_ltdb_header header;
579                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
580                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
581                                          (unsigned)existing.dsize, srcnode));
582                                 free(existing.dptr);
583                                 talloc_free(tmp_ctx);
584                                 return -1;
585                         }
586                         header = *(struct ctdb_ltdb_header *)existing.dptr;
587                         free(existing.dptr);
588                         if (!(header.rsn < hdr->rsn ||
589                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
590                                 continue;
591                         }
592                 }
593                 
594                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
595                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
596                         talloc_free(tmp_ctx);
597                         return -1;                              
598                 }
599         }
600
601         talloc_free(tmp_ctx);
602
603         return 0;
604 }
605
606 /*
607   pull all the remote database contents into the recdb
608  */
609 static int pull_remote_database(struct ctdb_context *ctdb,
610                                 struct ctdb_recoverd *rec, 
611                                 struct ctdb_node_map *nodemap, 
612                                 struct tdb_wrap *recdb, uint32_t dbid,
613                                 bool persistent)
614 {
615         int j;
616
617         /* pull all records from all other nodes across onto this node
618            (this merges based on rsn)
619         */
620         for (j=0; j<nodemap->num; j++) {
621                 /* dont merge from nodes that are unavailable */
622                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
623                         continue;
624                 }
625                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
626                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
627                                  nodemap->nodes[j].pnn));
628                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
629                         return -1;
630                 }
631         }
632         
633         return 0;
634 }
635
636
637 /*
638   update flags on all active nodes
639  */
640 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
641 {
642         int ret;
643
644         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
645                 if (ret != 0) {
646                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
647                 return -1;
648         }
649
650         return 0;
651 }
652
653 /*
654   ensure all nodes have the same vnnmap we do
655  */
656 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
657                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
658 {
659         int j, ret;
660
661         /* push the new vnn map out to all the nodes */
662         for (j=0; j<nodemap->num; j++) {
663                 /* dont push to nodes that are unavailable */
664                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
665                         continue;
666                 }
667
668                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
669                 if (ret != 0) {
670                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
671                         return -1;
672                 }
673         }
674
675         return 0;
676 }
677
678
679 struct vacuum_info {
680         struct vacuum_info *next, *prev;
681         struct ctdb_recoverd *rec;
682         uint32_t srcnode;
683         struct ctdb_db_context *ctdb_db;
684         struct ctdb_marshall_buffer *recs;
685         struct ctdb_rec_data *r;
686 };
687
688 static void vacuum_fetch_next(struct vacuum_info *v);
689
690 /*
691   called when a vacuum fetch has completed - just free it and do the next one
692  */
693 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
694 {
695         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
696         talloc_free(state);
697         vacuum_fetch_next(v);
698 }
699
700
701 /*
702   process the next element from the vacuum list
703 */
704 static void vacuum_fetch_next(struct vacuum_info *v)
705 {
706         struct ctdb_call call;
707         struct ctdb_rec_data *r;
708
709         while (v->recs->count) {
710                 struct ctdb_client_call_state *state;
711                 TDB_DATA data;
712                 struct ctdb_ltdb_header *hdr;
713
714                 ZERO_STRUCT(call);
715                 call.call_id = CTDB_NULL_FUNC;
716                 call.flags = CTDB_IMMEDIATE_MIGRATION;
717                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
718
719                 r = v->r;
720                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
721                 v->recs->count--;
722
723                 call.key.dptr = &r->data[0];
724                 call.key.dsize = r->keylen;
725
726                 /* ensure we don't block this daemon - just skip a record if we can't get
727                    the chainlock */
728                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
729                         continue;
730                 }
731
732                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
733                 if (data.dptr == NULL) {
734                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
735                         continue;
736                 }
737
738                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
739                         free(data.dptr);
740                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
741                         continue;
742                 }
743                 
744                 hdr = (struct ctdb_ltdb_header *)data.dptr;
745                 if (hdr->dmaster == v->rec->ctdb->pnn) {
746                         /* its already local */
747                         free(data.dptr);
748                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
749                         continue;
750                 }
751
752                 free(data.dptr);
753
754                 state = ctdb_call_send(v->ctdb_db, &call);
755                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
756                 if (state == NULL) {
757                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
758                         talloc_free(v);
759                         return;
760                 }
761                 state->async.fn = vacuum_fetch_callback;
762                 state->async.private_data = v;
763                 return;
764         }
765
766         talloc_free(v);
767 }
768
769
770 /*
771   destroy a vacuum info structure
772  */
773 static int vacuum_info_destructor(struct vacuum_info *v)
774 {
775         DLIST_REMOVE(v->rec->vacuum_info, v);
776         return 0;
777 }
778
779
780 /*
781   handler for vacuum fetch
782 */
783 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
784                                  TDB_DATA data, void *private_data)
785 {
786         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
787         struct ctdb_marshall_buffer *recs;
788         int ret, i;
789         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
790         const char *name;
791         struct ctdb_dbid_map *dbmap=NULL;
792         bool persistent = false;
793         struct ctdb_db_context *ctdb_db;
794         struct ctdb_rec_data *r;
795         uint32_t srcnode;
796         struct vacuum_info *v;
797
798         recs = (struct ctdb_marshall_buffer *)data.dptr;
799         r = (struct ctdb_rec_data *)&recs->data[0];
800
801         if (recs->count == 0) {
802                 talloc_free(tmp_ctx);
803                 return;
804         }
805
806         srcnode = r->reqid;
807
808         for (v=rec->vacuum_info;v;v=v->next) {
809                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
810                         /* we're already working on records from this node */
811                         talloc_free(tmp_ctx);
812                         return;
813                 }
814         }
815
816         /* work out if the database is persistent */
817         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
818         if (ret != 0) {
819                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
820                 talloc_free(tmp_ctx);
821                 return;
822         }
823
824         for (i=0;i<dbmap->num;i++) {
825                 if (dbmap->dbs[i].dbid == recs->db_id) {
826                         persistent = dbmap->dbs[i].persistent;
827                         break;
828                 }
829         }
830         if (i == dbmap->num) {
831                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
832                 talloc_free(tmp_ctx);
833                 return;         
834         }
835
836         /* find the name of this database */
837         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
838                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
839                 talloc_free(tmp_ctx);
840                 return;
841         }
842
843         /* attach to it */
844         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
845         if (ctdb_db == NULL) {
846                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
847                 talloc_free(tmp_ctx);
848                 return;
849         }
850
851         v = talloc_zero(rec, struct vacuum_info);
852         if (v == NULL) {
853                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
854                 talloc_free(tmp_ctx);
855                 return;
856         }
857
858         v->rec = rec;
859         v->srcnode = srcnode;
860         v->ctdb_db = ctdb_db;
861         v->recs = talloc_memdup(v, recs, data.dsize);
862         if (v->recs == NULL) {
863                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
864                 talloc_free(v);
865                 talloc_free(tmp_ctx);
866                 return;         
867         }
868         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
869
870         DLIST_ADD(rec->vacuum_info, v);
871
872         talloc_set_destructor(v, vacuum_info_destructor);
873
874         vacuum_fetch_next(v);
875         talloc_free(tmp_ctx);
876 }
877
878
879 /*
880   called when ctdb_wait_timeout should finish
881  */
882 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
883                               struct timeval yt, void *p)
884 {
885         uint32_t *timed_out = (uint32_t *)p;
886         (*timed_out) = 1;
887 }
888
889 /*
890   wait for a given number of seconds
891  */
892 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
893 {
894         uint32_t timed_out = 0;
895         time_t usecs = (secs - (time_t)secs) * 1000000;
896         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
897         while (!timed_out) {
898                 event_loop_once(ctdb->ev);
899         }
900 }
901
902 /*
903   called when an election times out (ends)
904  */
905 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
906                                   struct timeval t, void *p)
907 {
908         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
909         rec->election_timeout = NULL;
910         fast_start = false;
911
912         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
913 }
914
915
916 /*
917   wait for an election to finish. It finished election_timeout seconds after
918   the last election packet is received
919  */
920 static void ctdb_wait_election(struct ctdb_recoverd *rec)
921 {
922         struct ctdb_context *ctdb = rec->ctdb;
923         while (rec->election_timeout) {
924                 event_loop_once(ctdb->ev);
925         }
926 }
927
928 /*
929   Update our local flags from all remote connected nodes. 
930   This is only run when we are or we belive we are the recovery master
931  */
932 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
933 {
934         int j;
935         struct ctdb_context *ctdb = rec->ctdb;
936         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
937
938         /* get the nodemap for all active remote nodes and verify
939            they are the same as for this node
940          */
941         for (j=0; j<nodemap->num; j++) {
942                 struct ctdb_node_map *remote_nodemap=NULL;
943                 int ret;
944
945                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
946                         continue;
947                 }
948                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
949                         continue;
950                 }
951
952                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
953                                            mem_ctx, &remote_nodemap);
954                 if (ret != 0) {
955                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
956                                   nodemap->nodes[j].pnn));
957                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
958                         talloc_free(mem_ctx);
959                         return MONITOR_FAILED;
960                 }
961                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
962                         /* We should tell our daemon about this so it
963                            updates its flags or else we will log the same 
964                            message again in the next iteration of recovery.
965                            Since we are the recovery master we can just as
966                            well update the flags on all nodes.
967                         */
968                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
969                         if (ret != 0) {
970                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
971                                 return -1;
972                         }
973
974                         /* Update our local copy of the flags in the recovery
975                            daemon.
976                         */
977                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
978                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
979                                  nodemap->nodes[j].flags));
980                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
981                 }
982                 talloc_free(remote_nodemap);
983         }
984         talloc_free(mem_ctx);
985         return MONITOR_OK;
986 }
987
988
989 /* Create a new random generation ip. 
990    The generation id can not be the INVALID_GENERATION id
991 */
992 static uint32_t new_generation(void)
993 {
994         uint32_t generation;
995
996         while (1) {
997                 generation = random();
998
999                 if (generation != INVALID_GENERATION) {
1000                         break;
1001                 }
1002         }
1003
1004         return generation;
1005 }
1006
1007
1008 /*
1009   create a temporary working database
1010  */
1011 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1012 {
1013         char *name;
1014         struct tdb_wrap *recdb;
1015         unsigned tdb_flags;
1016
1017         /* open up the temporary recovery database */
1018         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1019                                ctdb->db_directory_state,
1020                                ctdb->pnn);
1021         if (name == NULL) {
1022                 return NULL;
1023         }
1024         unlink(name);
1025
1026         tdb_flags = TDB_NOLOCK;
1027         if (ctdb->valgrinding) {
1028                 tdb_flags |= TDB_NOMMAP;
1029         }
1030         tdb_flags |= TDB_DISALLOW_NESTING;
1031
1032         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1033                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1034         if (recdb == NULL) {
1035                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1036         }
1037
1038         talloc_free(name);
1039
1040         return recdb;
1041 }
1042
1043
1044 /* 
1045    a traverse function for pulling all relevent records from recdb
1046  */
1047 struct recdb_data {
1048         struct ctdb_context *ctdb;
1049         struct ctdb_marshall_buffer *recdata;
1050         uint32_t len;
1051         bool failed;
1052         bool persistent;
1053 };
1054
1055 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1056 {
1057         struct recdb_data *params = (struct recdb_data *)p;
1058         struct ctdb_rec_data *rec;
1059         struct ctdb_ltdb_header *hdr;
1060
1061         /* skip empty records */
1062         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1063                 return 0;
1064         }
1065
1066         /* update the dmaster field to point to us */
1067         hdr = (struct ctdb_ltdb_header *)data.dptr;
1068         if (!params->persistent) {
1069                 hdr->dmaster = params->ctdb->pnn;
1070                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1071         }
1072
1073         /* add the record to the blob ready to send to the nodes */
1074         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1075         if (rec == NULL) {
1076                 params->failed = true;
1077                 return -1;
1078         }
1079         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1080         if (params->recdata == NULL) {
1081                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1082                          rec->length + params->len, params->recdata->count));
1083                 params->failed = true;
1084                 return -1;
1085         }
1086         params->recdata->count++;
1087         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1088         params->len += rec->length;
1089         talloc_free(rec);
1090
1091         return 0;
1092 }
1093
1094 /*
1095   push the recdb database out to all nodes
1096  */
1097 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1098                                bool persistent,
1099                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1100 {
1101         struct recdb_data params;
1102         struct ctdb_marshall_buffer *recdata;
1103         TDB_DATA outdata;
1104         TALLOC_CTX *tmp_ctx;
1105         uint32_t *nodes;
1106
1107         tmp_ctx = talloc_new(ctdb);
1108         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1109
1110         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1111         CTDB_NO_MEMORY(ctdb, recdata);
1112
1113         recdata->db_id = dbid;
1114
1115         params.ctdb = ctdb;
1116         params.recdata = recdata;
1117         params.len = offsetof(struct ctdb_marshall_buffer, data);
1118         params.failed = false;
1119         params.persistent = persistent;
1120
1121         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1122                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1123                 talloc_free(params.recdata);
1124                 talloc_free(tmp_ctx);
1125                 return -1;
1126         }
1127
1128         if (params.failed) {
1129                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1130                 talloc_free(params.recdata);
1131                 talloc_free(tmp_ctx);
1132                 return -1;              
1133         }
1134
1135         recdata = params.recdata;
1136
1137         outdata.dptr = (void *)recdata;
1138         outdata.dsize = params.len;
1139
1140         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1141         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1142                                         nodes, 0,
1143                                         CONTROL_TIMEOUT(), false, outdata,
1144                                         NULL, NULL,
1145                                         NULL) != 0) {
1146                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1147                 talloc_free(recdata);
1148                 talloc_free(tmp_ctx);
1149                 return -1;
1150         }
1151
1152         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1153                   dbid, recdata->count));
1154
1155         talloc_free(recdata);
1156         talloc_free(tmp_ctx);
1157
1158         return 0;
1159 }
1160
1161
1162 /*
1163   go through a full recovery on one database 
1164  */
1165 static int recover_database(struct ctdb_recoverd *rec, 
1166                             TALLOC_CTX *mem_ctx,
1167                             uint32_t dbid,
1168                             bool persistent,
1169                             uint32_t pnn, 
1170                             struct ctdb_node_map *nodemap,
1171                             uint32_t transaction_id)
1172 {
1173         struct tdb_wrap *recdb;
1174         int ret;
1175         struct ctdb_context *ctdb = rec->ctdb;
1176         TDB_DATA data;
1177         struct ctdb_control_wipe_database w;
1178         uint32_t *nodes;
1179
1180         recdb = create_recdb(ctdb, mem_ctx);
1181         if (recdb == NULL) {
1182                 return -1;
1183         }
1184
1185         /* pull all remote databases onto the recdb */
1186         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1187         if (ret != 0) {
1188                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1189                 return -1;
1190         }
1191
1192         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1193
1194         /* wipe all the remote databases. This is safe as we are in a transaction */
1195         w.db_id = dbid;
1196         w.transaction_id = transaction_id;
1197
1198         data.dptr = (void *)&w;
1199         data.dsize = sizeof(w);
1200
1201         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1202         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1203                                         nodes, 0,
1204                                         CONTROL_TIMEOUT(), false, data,
1205                                         NULL, NULL,
1206                                         NULL) != 0) {
1207                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1208                 talloc_free(recdb);
1209                 return -1;
1210         }
1211         
1212         /* push out the correct database. This sets the dmaster and skips 
1213            the empty records */
1214         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1215         if (ret != 0) {
1216                 talloc_free(recdb);
1217                 return -1;
1218         }
1219
1220         /* all done with this database */
1221         talloc_free(recdb);
1222
1223         return 0;
1224 }
1225
1226 /*
1227   reload the nodes file 
1228 */
1229 static void reload_nodes_file(struct ctdb_context *ctdb)
1230 {
1231         ctdb->nodes = NULL;
1232         ctdb_load_nodes_file(ctdb);
1233 }
1234
1235 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1236                                          struct ctdb_recoverd *rec,
1237                                          struct ctdb_node_map *nodemap,
1238                                          uint32_t *culprit)
1239 {
1240         int j;
1241         int ret;
1242
1243         if (ctdb->num_nodes != nodemap->num) {
1244                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1245                                   ctdb->num_nodes, nodemap->num));
1246                 if (culprit) {
1247                         *culprit = ctdb->pnn;
1248                 }
1249                 return -1;
1250         }
1251
1252         for (j=0; j<nodemap->num; j++) {
1253                 /* release any existing data */
1254                 if (ctdb->nodes[j]->known_public_ips) {
1255                         talloc_free(ctdb->nodes[j]->known_public_ips);
1256                         ctdb->nodes[j]->known_public_ips = NULL;
1257                 }
1258                 if (ctdb->nodes[j]->available_public_ips) {
1259                         talloc_free(ctdb->nodes[j]->available_public_ips);
1260                         ctdb->nodes[j]->available_public_ips = NULL;
1261                 }
1262
1263                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1264                         continue;
1265                 }
1266
1267                 /* grab a new shiny list of public ips from the node */
1268                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1269                                         CONTROL_TIMEOUT(),
1270                                         ctdb->nodes[j]->pnn,
1271                                         ctdb->nodes,
1272                                         0,
1273                                         &ctdb->nodes[j]->known_public_ips);
1274                 if (ret != 0) {
1275                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1276                                 ctdb->nodes[j]->pnn));
1277                         if (culprit) {
1278                                 *culprit = ctdb->nodes[j]->pnn;
1279                         }
1280                         return -1;
1281                 }
1282
1283                 if (ctdb->tunable.disable_ip_failover == 0) {
1284                         if (rec->ip_check_disable_ctx == NULL) {
1285                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1286                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1287                                         rec->need_takeover_run = true;
1288                                 }
1289                         }
1290                 }
1291
1292                 /* grab a new shiny list of public ips from the node */
1293                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1294                                         CONTROL_TIMEOUT(),
1295                                         ctdb->nodes[j]->pnn,
1296                                         ctdb->nodes,
1297                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1298                                         &ctdb->nodes[j]->available_public_ips);
1299                 if (ret != 0) {
1300                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1301                                 ctdb->nodes[j]->pnn));
1302                         if (culprit) {
1303                                 *culprit = ctdb->nodes[j]->pnn;
1304                         }
1305                         return -1;
1306                 }
1307         }
1308
1309         return 0;
1310 }
1311
1312 /* when we start a recovery, make sure all nodes use the same reclock file
1313    setting
1314 */
1315 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1316 {
1317         struct ctdb_context *ctdb = rec->ctdb;
1318         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1319         TDB_DATA data;
1320         uint32_t *nodes;
1321
1322         if (ctdb->recovery_lock_file == NULL) {
1323                 data.dptr  = NULL;
1324                 data.dsize = 0;
1325         } else {
1326                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1327                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1328         }
1329
1330         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1331         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1332                                         nodes, 0,
1333                                         CONTROL_TIMEOUT(),
1334                                         false, data,
1335                                         NULL, NULL,
1336                                         rec) != 0) {
1337                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1338                 talloc_free(tmp_ctx);
1339                 return -1;
1340         }
1341
1342         talloc_free(tmp_ctx);
1343         return 0;
1344 }
1345
1346
1347 /*
1348   we are the recmaster, and recovery is needed - start a recovery run
1349  */
1350 static int do_recovery(struct ctdb_recoverd *rec, 
1351                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1352                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1353 {
1354         struct ctdb_context *ctdb = rec->ctdb;
1355         int i, j, ret;
1356         uint32_t generation;
1357         struct ctdb_dbid_map *dbmap;
1358         TDB_DATA data;
1359         uint32_t *nodes;
1360         struct timeval start_time;
1361         uint32_t culprit = (uint32_t)-1;
1362
1363         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1364
1365         /* if recovery fails, force it again */
1366         rec->need_recovery = true;
1367
1368         for (i=0; i<ctdb->num_nodes; i++) {
1369                 struct ctdb_banning_state *ban_state;
1370
1371                 if (ctdb->nodes[i]->ban_state == NULL) {
1372                         continue;
1373                 }
1374                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1375                 if (ban_state->count < 2*ctdb->num_nodes) {
1376                         continue;
1377                 }
1378                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1379                         ctdb->nodes[i]->pnn, ban_state->count,
1380                         ctdb->tunable.recovery_ban_period));
1381                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1382                 ban_state->count = 0;
1383         }
1384
1385
1386         if (ctdb->tunable.verify_recovery_lock != 0) {
1387                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1388                 start_time = timeval_current();
1389                 if (!ctdb_recovery_lock(ctdb, true)) {
1390                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1391                                          "and ban ourself for %u seconds\n",
1392                                          ctdb->tunable.recovery_ban_period));
1393                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1394                         return -1;
1395                 }
1396                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1397                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1398         }
1399
1400         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1401
1402         /* get a list of all databases */
1403         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1404         if (ret != 0) {
1405                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1406                 return -1;
1407         }
1408
1409         /* we do the db creation before we set the recovery mode, so the freeze happens
1410            on all databases we will be dealing with. */
1411
1412         /* verify that we have all the databases any other node has */
1413         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1414         if (ret != 0) {
1415                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1416                 return -1;
1417         }
1418
1419         /* verify that all other nodes have all our databases */
1420         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1421         if (ret != 0) {
1422                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1423                 return -1;
1424         }
1425         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1426
1427         /* update the database priority for all remote databases */
1428         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1429         if (ret != 0) {
1430                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1431         }
1432         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1433
1434
1435         /* update all other nodes to use the same setting for reclock files
1436            as the local recovery master.
1437         */
1438         sync_recovery_lock_file_across_cluster(rec);
1439
1440         /* set recovery mode to active on all nodes */
1441         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1442         if (ret != 0) {
1443                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1444                 return -1;
1445         }
1446
1447         /* execute the "startrecovery" event script on all nodes */
1448         ret = run_startrecovery_eventscript(rec, nodemap);
1449         if (ret!=0) {
1450                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1451                 return -1;
1452         }
1453
1454         /*
1455           update all nodes to have the same flags that we have
1456          */
1457         for (i=0;i<nodemap->num;i++) {
1458                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1459                         continue;
1460                 }
1461
1462                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1463                 if (ret != 0) {
1464                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1465                         return -1;
1466                 }
1467         }
1468
1469         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1470
1471         /* pick a new generation number */
1472         generation = new_generation();
1473
1474         /* change the vnnmap on this node to use the new generation 
1475            number but not on any other nodes.
1476            this guarantees that if we abort the recovery prematurely
1477            for some reason (a node stops responding?)
1478            that we can just return immediately and we will reenter
1479            recovery shortly again.
1480            I.e. we deliberately leave the cluster with an inconsistent
1481            generation id to allow us to abort recovery at any stage and
1482            just restart it from scratch.
1483          */
1484         vnnmap->generation = generation;
1485         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1486         if (ret != 0) {
1487                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1488                 return -1;
1489         }
1490
1491         data.dptr = (void *)&generation;
1492         data.dsize = sizeof(uint32_t);
1493
1494         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1495         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1496                                         nodes, 0,
1497                                         CONTROL_TIMEOUT(), false, data,
1498                                         NULL,
1499                                         transaction_start_fail_callback,
1500                                         rec) != 0) {
1501                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1502                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1503                                         nodes, 0,
1504                                         CONTROL_TIMEOUT(), false, tdb_null,
1505                                         NULL,
1506                                         NULL,
1507                                         NULL) != 0) {
1508                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1509                 }
1510                 return -1;
1511         }
1512
1513         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1514
1515         for (i=0;i<dbmap->num;i++) {
1516                 ret = recover_database(rec, mem_ctx,
1517                                        dbmap->dbs[i].dbid,
1518                                        dbmap->dbs[i].persistent,
1519                                        pnn, nodemap, generation);
1520                 if (ret != 0) {
1521                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1522                         return -1;
1523                 }
1524         }
1525
1526         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1527
1528         /* commit all the changes */
1529         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1530                                         nodes, 0,
1531                                         CONTROL_TIMEOUT(), false, data,
1532                                         NULL, NULL,
1533                                         NULL) != 0) {
1534                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1535                 return -1;
1536         }
1537
1538         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1539         
1540
1541         /* update the capabilities for all nodes */
1542         ret = update_capabilities(ctdb, nodemap);
1543         if (ret!=0) {
1544                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1545                 return -1;
1546         }
1547
1548         /* build a new vnn map with all the currently active and
1549            unbanned nodes */
1550         generation = new_generation();
1551         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1552         CTDB_NO_MEMORY(ctdb, vnnmap);
1553         vnnmap->generation = generation;
1554         vnnmap->size = 0;
1555         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1556         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1557         for (i=j=0;i<nodemap->num;i++) {
1558                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1559                         continue;
1560                 }
1561                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1562                         /* this node can not be an lmaster */
1563                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1564                         continue;
1565                 }
1566
1567                 vnnmap->size++;
1568                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1569                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1570                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1571
1572         }
1573         if (vnnmap->size == 0) {
1574                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1575                 vnnmap->size++;
1576                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1577                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1578                 vnnmap->map[0] = pnn;
1579         }       
1580
1581         /* update to the new vnnmap on all nodes */
1582         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1583         if (ret != 0) {
1584                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1585                 return -1;
1586         }
1587
1588         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1589
1590         /* update recmaster to point to us for all nodes */
1591         ret = set_recovery_master(ctdb, nodemap, pnn);
1592         if (ret!=0) {
1593                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1594                 return -1;
1595         }
1596
1597         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1598
1599         /*
1600           update all nodes to have the same flags that we have
1601          */
1602         for (i=0;i<nodemap->num;i++) {
1603                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1604                         continue;
1605                 }
1606
1607                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1608                 if (ret != 0) {
1609                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1610                         return -1;
1611                 }
1612         }
1613
1614         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1615
1616         /* disable recovery mode */
1617         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1618         if (ret != 0) {
1619                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1620                 return -1;
1621         }
1622
1623         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1624
1625         /*
1626           tell nodes to takeover their public IPs
1627          */
1628         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1629         if (ret != 0) {
1630                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1631                                  culprit));
1632                 rec->need_takeover_run = true;
1633                 return -1;
1634         }
1635         rec->need_takeover_run = false;
1636         ret = ctdb_takeover_run(ctdb, nodemap);
1637         if (ret != 0) {
1638                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1639                 rec->need_takeover_run = true;
1640         }
1641
1642         /* execute the "recovered" event script on all nodes */
1643         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1644         if (ret!=0) {
1645                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1646                 return -1;
1647         }
1648
1649         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1650
1651         /* send a message to all clients telling them that the cluster 
1652            has been reconfigured */
1653         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1654
1655         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1656
1657         rec->need_recovery = false;
1658
1659         /* we managed to complete a full recovery, make sure to forgive
1660            any past sins by the nodes that could now participate in the
1661            recovery.
1662         */
1663         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1664         for (i=0;i<nodemap->num;i++) {
1665                 struct ctdb_banning_state *ban_state;
1666
1667                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1668                         continue;
1669                 }
1670
1671                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1672                 if (ban_state == NULL) {
1673                         continue;
1674                 }
1675
1676                 ban_state->count = 0;
1677         }
1678
1679
1680         /* We just finished a recovery successfully. 
1681            We now wait for rerecovery_timeout before we allow 
1682            another recovery to take place.
1683         */
1684         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1685         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1686         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1687
1688         return 0;
1689 }
1690
1691
1692 /*
1693   elections are won by first checking the number of connected nodes, then
1694   the priority time, then the pnn
1695  */
1696 struct election_message {
1697         uint32_t num_connected;
1698         struct timeval priority_time;
1699         uint32_t pnn;
1700         uint32_t node_flags;
1701 };
1702
1703 /*
1704   form this nodes election data
1705  */
1706 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1707 {
1708         int ret, i;
1709         struct ctdb_node_map *nodemap;
1710         struct ctdb_context *ctdb = rec->ctdb;
1711
1712         ZERO_STRUCTP(em);
1713
1714         em->pnn = rec->ctdb->pnn;
1715         em->priority_time = rec->priority_time;
1716
1717         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1718         if (ret != 0) {
1719                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1720                 return;
1721         }
1722
1723         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1724         em->node_flags = rec->node_flags;
1725
1726         for (i=0;i<nodemap->num;i++) {
1727                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1728                         em->num_connected++;
1729                 }
1730         }
1731
1732         /* we shouldnt try to win this election if we cant be a recmaster */
1733         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1734                 em->num_connected = 0;
1735                 em->priority_time = timeval_current();
1736         }
1737
1738         talloc_free(nodemap);
1739 }
1740
1741 /*
1742   see if the given election data wins
1743  */
1744 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1745 {
1746         struct election_message myem;
1747         int cmp = 0;
1748
1749         ctdb_election_data(rec, &myem);
1750
1751         /* we cant win if we dont have the recmaster capability */
1752         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1753                 return false;
1754         }
1755
1756         /* we cant win if we are banned */
1757         if (rec->node_flags & NODE_FLAGS_BANNED) {
1758                 return false;
1759         }       
1760
1761         /* we cant win if we are stopped */
1762         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1763                 return false;
1764         }       
1765
1766         /* we will automatically win if the other node is banned */
1767         if (em->node_flags & NODE_FLAGS_BANNED) {
1768                 return true;
1769         }
1770
1771         /* we will automatically win if the other node is banned */
1772         if (em->node_flags & NODE_FLAGS_STOPPED) {
1773                 return true;
1774         }
1775
1776         /* try to use the most connected node */
1777         if (cmp == 0) {
1778                 cmp = (int)myem.num_connected - (int)em->num_connected;
1779         }
1780
1781         /* then the longest running node */
1782         if (cmp == 0) {
1783                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1784         }
1785
1786         if (cmp == 0) {
1787                 cmp = (int)myem.pnn - (int)em->pnn;
1788         }
1789
1790         return cmp > 0;
1791 }
1792
1793 /*
1794   send out an election request
1795  */
1796 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1797 {
1798         int ret;
1799         TDB_DATA election_data;
1800         struct election_message emsg;
1801         uint64_t srvid;
1802         struct ctdb_context *ctdb = rec->ctdb;
1803
1804         srvid = CTDB_SRVID_RECOVERY;
1805
1806         ctdb_election_data(rec, &emsg);
1807
1808         election_data.dsize = sizeof(struct election_message);
1809         election_data.dptr  = (unsigned char *)&emsg;
1810
1811
1812         /* send an election message to all active nodes */
1813         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1814         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1815
1816
1817         /* A new node that is already frozen has entered the cluster.
1818            The existing nodes are not frozen and dont need to be frozen
1819            until the election has ended and we start the actual recovery
1820         */
1821         if (update_recmaster == true) {
1822                 /* first we assume we will win the election and set 
1823                    recoverymaster to be ourself on the current node
1824                  */
1825                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1826                 if (ret != 0) {
1827                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1828                         return -1;
1829                 }
1830         }
1831
1832
1833         return 0;
1834 }
1835
1836 /*
1837   this function will unban all nodes in the cluster
1838 */
1839 static void unban_all_nodes(struct ctdb_context *ctdb)
1840 {
1841         int ret, i;
1842         struct ctdb_node_map *nodemap;
1843         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1844         
1845         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1846         if (ret != 0) {
1847                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1848                 return;
1849         }
1850
1851         for (i=0;i<nodemap->num;i++) {
1852                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1853                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1854                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1855                 }
1856         }
1857
1858         talloc_free(tmp_ctx);
1859 }
1860
1861
1862 /*
1863   we think we are winning the election - send a broadcast election request
1864  */
1865 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1866 {
1867         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1868         int ret;
1869
1870         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1871         if (ret != 0) {
1872                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1873         }
1874
1875         talloc_free(rec->send_election_te);
1876         rec->send_election_te = NULL;
1877 }
1878
1879 /*
1880   handler for memory dumps
1881 */
1882 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1883                              TDB_DATA data, void *private_data)
1884 {
1885         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1886         TDB_DATA *dump;
1887         int ret;
1888         struct rd_memdump_reply *rd;
1889
1890         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1891                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1892                 talloc_free(tmp_ctx);
1893                 return;
1894         }
1895         rd = (struct rd_memdump_reply *)data.dptr;
1896
1897         dump = talloc_zero(tmp_ctx, TDB_DATA);
1898         if (dump == NULL) {
1899                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1900                 talloc_free(tmp_ctx);
1901                 return;
1902         }
1903         ret = ctdb_dump_memory(ctdb, dump);
1904         if (ret != 0) {
1905                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1906                 talloc_free(tmp_ctx);
1907                 return;
1908         }
1909
1910 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1911
1912         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1913         if (ret != 0) {
1914                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1915                 talloc_free(tmp_ctx);
1916                 return;
1917         }
1918
1919         talloc_free(tmp_ctx);
1920 }
1921
1922 /*
1923   handler for reload_nodes
1924 */
1925 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1926                              TDB_DATA data, void *private_data)
1927 {
1928         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1929
1930         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1931
1932         reload_nodes_file(rec->ctdb);
1933 }
1934
1935
1936 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1937                               struct timeval yt, void *p)
1938 {
1939         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1940
1941         talloc_free(rec->ip_check_disable_ctx);
1942         rec->ip_check_disable_ctx = NULL;
1943 }
1944
1945
1946 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1947                              TDB_DATA data, void *private_data)
1948 {
1949         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1950         struct ctdb_public_ip *ip;
1951
1952         if (rec->recmaster != rec->ctdb->pnn) {
1953                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
1954                 return;
1955         }
1956
1957         if (data.dsize != sizeof(struct ctdb_public_ip)) {
1958                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
1959                 return;
1960         }
1961
1962         ip = (struct ctdb_public_ip *)data.dptr;
1963
1964         update_ip_assignment_tree(rec->ctdb, ip);
1965 }
1966
1967
1968 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1969                              TDB_DATA data, void *private_data)
1970 {
1971         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1972         uint32_t timeout;
1973
1974         if (rec->ip_check_disable_ctx != NULL) {
1975                 talloc_free(rec->ip_check_disable_ctx);
1976                 rec->ip_check_disable_ctx = NULL;
1977         }
1978
1979         if (data.dsize != sizeof(uint32_t)) {
1980                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1981                                  "expexting %lu\n", (long unsigned)data.dsize,
1982                                  (long unsigned)sizeof(uint32_t)));
1983                 return;
1984         }
1985         if (data.dptr == NULL) {
1986                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1987                 return;
1988         }
1989
1990         timeout = *((uint32_t *)data.dptr);
1991         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1992
1993         rec->ip_check_disable_ctx = talloc_new(rec);
1994         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1995
1996         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1997 }
1998
1999
2000 /*
2001   handler for ip reallocate, just add it to the list of callers and 
2002   handle this later in the monitor_cluster loop so we do not recurse
2003   with other callers to takeover_run()
2004 */
2005 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2006                              TDB_DATA data, void *private_data)
2007 {
2008         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2009         struct ip_reallocate_list *caller;
2010
2011         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2012                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2013                 return;
2014         }
2015
2016         if (rec->ip_reallocate_ctx == NULL) {
2017                 rec->ip_reallocate_ctx = talloc_new(rec);
2018                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2019         }
2020
2021         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2022         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2023
2024         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2025         caller->next = rec->reallocate_callers;
2026         rec->reallocate_callers = caller;
2027
2028         return;
2029 }
2030
2031 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2032 {
2033         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2034         TDB_DATA result;
2035         int32_t ret;
2036         struct ip_reallocate_list *callers;
2037         uint32_t culprit;
2038
2039         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2040
2041         /* update the list of public ips that a node can handle for
2042            all connected nodes
2043         */
2044         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2045         if (ret != 0) {
2046                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2047                                  culprit));
2048                 rec->need_takeover_run = true;
2049         }
2050         if (ret == 0) {
2051                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2052                 if (ret != 0) {
2053                         DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
2054                         rec->need_takeover_run = true;
2055                 }
2056         }
2057
2058         result.dsize = sizeof(int32_t);
2059         result.dptr  = (uint8_t *)&ret;
2060
2061         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2062
2063                 /* Someone that sent srvid==0 does not want a reply */
2064                 if (callers->rd->srvid == 0) {
2065                         continue;
2066                 }
2067                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2068                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2069                                   (unsigned long long)callers->rd->srvid));
2070                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2071                 if (ret != 0) {
2072                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2073                                          "message to %u:%llu\n",
2074                                          (unsigned)callers->rd->pnn,
2075                                          (unsigned long long)callers->rd->srvid));
2076                 }
2077         }
2078
2079         talloc_free(tmp_ctx);
2080         talloc_free(rec->ip_reallocate_ctx);
2081         rec->ip_reallocate_ctx = NULL;
2082         rec->reallocate_callers = NULL;
2083         
2084 }
2085
2086
2087 /*
2088   handler for recovery master elections
2089 */
2090 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2091                              TDB_DATA data, void *private_data)
2092 {
2093         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2094         int ret;
2095         struct election_message *em = (struct election_message *)data.dptr;
2096         TALLOC_CTX *mem_ctx;
2097
2098         /* we got an election packet - update the timeout for the election */
2099         talloc_free(rec->election_timeout);
2100         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2101                                                 fast_start ?
2102                                                 timeval_current_ofs(0, 500000) :
2103                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2104                                                 ctdb_election_timeout, rec);
2105
2106         mem_ctx = talloc_new(ctdb);
2107
2108         /* someone called an election. check their election data
2109            and if we disagree and we would rather be the elected node, 
2110            send a new election message to all other nodes
2111          */
2112         if (ctdb_election_win(rec, em)) {
2113                 if (!rec->send_election_te) {
2114                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2115                                                                 timeval_current_ofs(0, 500000),
2116                                                                 election_send_request, rec);
2117                 }
2118                 talloc_free(mem_ctx);
2119                 /*unban_all_nodes(ctdb);*/
2120                 return;
2121         }
2122         
2123         /* we didn't win */
2124         talloc_free(rec->send_election_te);
2125         rec->send_election_te = NULL;
2126
2127         if (ctdb->tunable.verify_recovery_lock != 0) {
2128                 /* release the recmaster lock */
2129                 if (em->pnn != ctdb->pnn &&
2130                     ctdb->recovery_lock_fd != -1) {
2131                         close(ctdb->recovery_lock_fd);
2132                         ctdb->recovery_lock_fd = -1;
2133                         unban_all_nodes(ctdb);
2134                 }
2135         }
2136
2137         /* ok, let that guy become recmaster then */
2138         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2139         if (ret != 0) {
2140                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2141                 talloc_free(mem_ctx);
2142                 return;
2143         }
2144
2145         talloc_free(mem_ctx);
2146         return;
2147 }
2148
2149
2150 /*
2151   force the start of the election process
2152  */
2153 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2154                            struct ctdb_node_map *nodemap)
2155 {
2156         int ret;
2157         struct ctdb_context *ctdb = rec->ctdb;
2158
2159         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2160
2161         /* set all nodes to recovery mode to stop all internode traffic */
2162         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2163         if (ret != 0) {
2164                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2165                 return;
2166         }
2167
2168         talloc_free(rec->election_timeout);
2169         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2170                                                 fast_start ?
2171                                                 timeval_current_ofs(0, 500000) :
2172                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2173                                                 ctdb_election_timeout, rec);
2174
2175         ret = send_election_request(rec, pnn, true);
2176         if (ret!=0) {
2177                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2178                 return;
2179         }
2180
2181         /* wait for a few seconds to collect all responses */
2182         ctdb_wait_election(rec);
2183 }
2184
2185
2186
2187 /*
2188   handler for when a node changes its flags
2189 */
2190 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2191                             TDB_DATA data, void *private_data)
2192 {
2193         int ret;
2194         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2195         struct ctdb_node_map *nodemap=NULL;
2196         TALLOC_CTX *tmp_ctx;
2197         uint32_t changed_flags;
2198         int i;
2199         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2200         int disabled_flag_changed;
2201
2202         if (data.dsize != sizeof(*c)) {
2203                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2204                 return;
2205         }
2206
2207         tmp_ctx = talloc_new(ctdb);
2208         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2209
2210         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2211         if (ret != 0) {
2212                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2213                 talloc_free(tmp_ctx);
2214                 return;         
2215         }
2216
2217
2218         for (i=0;i<nodemap->num;i++) {
2219                 if (nodemap->nodes[i].pnn == c->pnn) break;
2220         }
2221
2222         if (i == nodemap->num) {
2223                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2224                 talloc_free(tmp_ctx);
2225                 return;
2226         }
2227
2228         changed_flags = c->old_flags ^ c->new_flags;
2229
2230         if (nodemap->nodes[i].flags != c->new_flags) {
2231                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2232         }
2233
2234         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2235
2236         nodemap->nodes[i].flags = c->new_flags;
2237
2238         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2239                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2240
2241         if (ret == 0) {
2242                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2243                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2244         }
2245         
2246         if (ret == 0 &&
2247             ctdb->recovery_master == ctdb->pnn &&
2248             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2249                 /* Only do the takeover run if the perm disabled or unhealthy
2250                    flags changed since these will cause an ip failover but not
2251                    a recovery.
2252                    If the node became disconnected or banned this will also
2253                    lead to an ip address failover but that is handled 
2254                    during recovery
2255                 */
2256                 if (disabled_flag_changed) {
2257                         rec->need_takeover_run = true;
2258                 }
2259         }
2260
2261         talloc_free(tmp_ctx);
2262 }
2263
2264 /*
2265   handler for when we need to push out flag changes ot all other nodes
2266 */
2267 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2268                             TDB_DATA data, void *private_data)
2269 {
2270         int ret;
2271         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2272         struct ctdb_node_map *nodemap=NULL;
2273         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2274         uint32_t recmaster;
2275         uint32_t *nodes;
2276
2277         /* find the recovery master */
2278         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2279         if (ret != 0) {
2280                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2281                 talloc_free(tmp_ctx);
2282                 return;
2283         }
2284
2285         /* read the node flags from the recmaster */
2286         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2287         if (ret != 0) {
2288                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2289                 talloc_free(tmp_ctx);
2290                 return;
2291         }
2292         if (c->pnn >= nodemap->num) {
2293                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2294                 talloc_free(tmp_ctx);
2295                 return;
2296         }
2297
2298         /* send the flags update to all connected nodes */
2299         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2300
2301         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2302                                       nodes, 0, CONTROL_TIMEOUT(),
2303                                       false, data,
2304                                       NULL, NULL,
2305                                       NULL) != 0) {
2306                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2307
2308                 talloc_free(tmp_ctx);
2309                 return;
2310         }
2311
2312         talloc_free(tmp_ctx);
2313 }
2314
2315
2316 struct verify_recmode_normal_data {
2317         uint32_t count;
2318         enum monitor_result status;
2319 };
2320
2321 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2322 {
2323         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2324
2325
2326         /* one more node has responded with recmode data*/
2327         rmdata->count--;
2328
2329         /* if we failed to get the recmode, then return an error and let
2330            the main loop try again.
2331         */
2332         if (state->state != CTDB_CONTROL_DONE) {
2333                 if (rmdata->status == MONITOR_OK) {
2334                         rmdata->status = MONITOR_FAILED;
2335                 }
2336                 return;
2337         }
2338
2339         /* if we got a response, then the recmode will be stored in the
2340            status field
2341         */
2342         if (state->status != CTDB_RECOVERY_NORMAL) {
2343                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2344                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2345         }
2346
2347         return;
2348 }
2349
2350
2351 /* verify that all nodes are in normal recovery mode */
2352 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2353 {
2354         struct verify_recmode_normal_data *rmdata;
2355         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2356         struct ctdb_client_control_state *state;
2357         enum monitor_result status;
2358         int j;
2359         
2360         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2361         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2362         rmdata->count  = 0;
2363         rmdata->status = MONITOR_OK;
2364
2365         /* loop over all active nodes and send an async getrecmode call to 
2366            them*/
2367         for (j=0; j<nodemap->num; j++) {
2368                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2369                         continue;
2370                 }
2371                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2372                                         CONTROL_TIMEOUT(), 
2373                                         nodemap->nodes[j].pnn);
2374                 if (state == NULL) {
2375                         /* we failed to send the control, treat this as 
2376                            an error and try again next iteration
2377                         */                      
2378                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2379                         talloc_free(mem_ctx);
2380                         return MONITOR_FAILED;
2381                 }
2382
2383                 /* set up the callback functions */
2384                 state->async.fn = verify_recmode_normal_callback;
2385                 state->async.private_data = rmdata;
2386
2387                 /* one more control to wait for to complete */
2388                 rmdata->count++;
2389         }
2390
2391
2392         /* now wait for up to the maximum number of seconds allowed
2393            or until all nodes we expect a response from has replied
2394         */
2395         while (rmdata->count > 0) {
2396                 event_loop_once(ctdb->ev);
2397         }
2398
2399         status = rmdata->status;
2400         talloc_free(mem_ctx);
2401         return status;
2402 }
2403
2404
2405 struct verify_recmaster_data {
2406         struct ctdb_recoverd *rec;
2407         uint32_t count;
2408         uint32_t pnn;
2409         enum monitor_result status;
2410 };
2411
2412 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2413 {
2414         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2415
2416
2417         /* one more node has responded with recmaster data*/
2418         rmdata->count--;
2419
2420         /* if we failed to get the recmaster, then return an error and let
2421            the main loop try again.
2422         */
2423         if (state->state != CTDB_CONTROL_DONE) {
2424                 if (rmdata->status == MONITOR_OK) {
2425                         rmdata->status = MONITOR_FAILED;
2426                 }
2427                 return;
2428         }
2429
2430         /* if we got a response, then the recmaster will be stored in the
2431            status field
2432         */
2433         if (state->status != rmdata->pnn) {
2434                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2435                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2436                 rmdata->status = MONITOR_ELECTION_NEEDED;
2437         }
2438
2439         return;
2440 }
2441
2442
2443 /* verify that all nodes agree that we are the recmaster */
2444 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2445 {
2446         struct ctdb_context *ctdb = rec->ctdb;
2447         struct verify_recmaster_data *rmdata;
2448         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2449         struct ctdb_client_control_state *state;
2450         enum monitor_result status;
2451         int j;
2452         
2453         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2454         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2455         rmdata->rec    = rec;
2456         rmdata->count  = 0;
2457         rmdata->pnn    = pnn;
2458         rmdata->status = MONITOR_OK;
2459
2460         /* loop over all active nodes and send an async getrecmaster call to 
2461            them*/
2462         for (j=0; j<nodemap->num; j++) {
2463                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2464                         continue;
2465                 }
2466                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2467                                         CONTROL_TIMEOUT(),
2468                                         nodemap->nodes[j].pnn);
2469                 if (state == NULL) {
2470                         /* we failed to send the control, treat this as 
2471                            an error and try again next iteration
2472                         */                      
2473                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2474                         talloc_free(mem_ctx);
2475                         return MONITOR_FAILED;
2476                 }
2477
2478                 /* set up the callback functions */
2479                 state->async.fn = verify_recmaster_callback;
2480                 state->async.private_data = rmdata;
2481
2482                 /* one more control to wait for to complete */
2483                 rmdata->count++;
2484         }
2485
2486
2487         /* now wait for up to the maximum number of seconds allowed
2488            or until all nodes we expect a response from has replied
2489         */
2490         while (rmdata->count > 0) {
2491                 event_loop_once(ctdb->ev);
2492         }
2493
2494         status = rmdata->status;
2495         talloc_free(mem_ctx);
2496         return status;
2497 }
2498
2499
2500 /* called to check that the local allocation of public ip addresses is ok.
2501 */
2502 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2503 {
2504         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2505         struct ctdb_control_get_ifaces *ifaces = NULL;
2506         struct ctdb_all_public_ips *ips = NULL;
2507         struct ctdb_uptime *uptime1 = NULL;
2508         struct ctdb_uptime *uptime2 = NULL;
2509         int ret, j;
2510         bool need_iface_check = false;
2511         bool need_takeover_run = false;
2512
2513         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2514                                 CTDB_CURRENT_NODE, &uptime1);
2515         if (ret != 0) {
2516                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2517                 talloc_free(mem_ctx);
2518                 return -1;
2519         }
2520
2521
2522         /* read the interfaces from the local node */
2523         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2524         if (ret != 0) {
2525                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2526                 talloc_free(mem_ctx);
2527                 return -1;
2528         }
2529
2530         if (!rec->ifaces) {
2531                 need_iface_check = true;
2532         } else if (rec->ifaces->num != ifaces->num) {
2533                 need_iface_check = true;
2534         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2535                 need_iface_check = true;
2536         }
2537
2538         if (need_iface_check) {
2539                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2540                                      "local node %u - force takeover run\n",
2541                                      pnn));
2542                 need_takeover_run = true;
2543         }
2544
2545         /* read the ip allocation from the local node */
2546         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2547         if (ret != 0) {
2548                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2549                 talloc_free(mem_ctx);
2550                 return -1;
2551         }
2552
2553         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2554                                 CTDB_CURRENT_NODE, &uptime2);
2555         if (ret != 0) {
2556                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2557                 talloc_free(mem_ctx);
2558                 return -1;
2559         }
2560
2561         /* skip the check if the startrecovery time has changed */
2562         if (timeval_compare(&uptime1->last_recovery_started,
2563                             &uptime2->last_recovery_started) != 0) {
2564                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2565                 talloc_free(mem_ctx);
2566                 return 0;
2567         }
2568
2569         /* skip the check if the endrecovery time has changed */
2570         if (timeval_compare(&uptime1->last_recovery_finished,
2571                             &uptime2->last_recovery_finished) != 0) {
2572                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2573                 talloc_free(mem_ctx);
2574                 return 0;
2575         }
2576
2577         /* skip the check if we have started but not finished recovery */
2578         if (timeval_compare(&uptime1->last_recovery_finished,
2579                             &uptime1->last_recovery_started) != 1) {
2580                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2581                 talloc_free(mem_ctx);
2582
2583                 return 0;
2584         }
2585
2586         talloc_free(rec->ifaces);
2587         rec->ifaces = talloc_steal(rec, ifaces);
2588
2589         /* verify that we have the ip addresses we should have
2590            and we dont have ones we shouldnt have.
2591            if we find an inconsistency we set recmode to
2592            active on the local node and wait for the recmaster
2593            to do a full blown recovery.
2594            also if the pnn is -1 and we are healthy and can host the ip
2595            we also request a ip reallocation.
2596         */
2597         if (ctdb->tunable.disable_ip_failover == 0) {
2598                 for (j=0; j<ips->num; j++) {
2599                         if (ips->ips[j].pnn == -1 && nodemap->nodes[pnn].flags == 0) {
2600                                 DEBUG(DEBUG_CRIT,("Public address '%s' is not assigned and we could serve this ip\n",
2601                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2602                                 need_takeover_run = true;
2603                         } else if (ips->ips[j].pnn == pnn) {
2604                                 if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2605                                         DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2606                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2607                                         need_takeover_run = true;
2608                                 }
2609                         } else {
2610                                 if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2611                                         DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2612                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2613                                         need_takeover_run = true;
2614                                 }
2615                         }
2616                 }
2617         }
2618
2619         if (need_takeover_run) {
2620                 struct takeover_run_reply rd;
2621                 TDB_DATA data;
2622
2623                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2624
2625                 rd.pnn = ctdb->pnn;
2626                 rd.srvid = 0;
2627                 data.dptr = (uint8_t *)&rd;
2628                 data.dsize = sizeof(rd);
2629
2630                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2631                 if (ret != 0) {
2632                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2633                 }
2634         }
2635         talloc_free(mem_ctx);
2636         return 0;
2637 }
2638
2639
2640 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2641 {
2642         struct ctdb_node_map **remote_nodemaps = callback_data;
2643
2644         if (node_pnn >= ctdb->num_nodes) {
2645                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2646                 return;
2647         }
2648
2649         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2650
2651 }
2652
2653 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2654         struct ctdb_node_map *nodemap,
2655         struct ctdb_node_map **remote_nodemaps)
2656 {
2657         uint32_t *nodes;
2658
2659         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2660         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2661                                         nodes, 0,
2662                                         CONTROL_TIMEOUT(), false, tdb_null,
2663                                         async_getnodemap_callback,
2664                                         NULL,
2665                                         remote_nodemaps) != 0) {
2666                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2667
2668                 return -1;
2669         }
2670
2671         return 0;
2672 }
2673
2674 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2675 struct ctdb_check_reclock_state {
2676         struct ctdb_context *ctdb;
2677         struct timeval start_time;
2678         int fd[2];
2679         pid_t child;
2680         struct timed_event *te;
2681         struct fd_event *fde;
2682         enum reclock_child_status status;
2683 };
2684
2685 /* when we free the reclock state we must kill any child process.
2686 */
2687 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2688 {
2689         struct ctdb_context *ctdb = state->ctdb;
2690
2691         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2692
2693         if (state->fd[0] != -1) {
2694                 close(state->fd[0]);
2695                 state->fd[0] = -1;
2696         }
2697         if (state->fd[1] != -1) {
2698                 close(state->fd[1]);
2699                 state->fd[1] = -1;
2700         }
2701         kill(state->child, SIGKILL);
2702         return 0;
2703 }
2704
2705 /*
2706   called if our check_reclock child times out. this would happen if
2707   i/o to the reclock file blocks.
2708  */
2709 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2710                                          struct timeval t, void *private_data)
2711 {
2712         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2713                                            struct ctdb_check_reclock_state);
2714
2715         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2716         state->status = RECLOCK_TIMEOUT;
2717 }
2718
2719 /* this is called when the child process has completed checking the reclock
2720    file and has written data back to us through the pipe.
2721 */
2722 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2723                              uint16_t flags, void *private_data)
2724 {
2725         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2726                                              struct ctdb_check_reclock_state);
2727         char c = 0;
2728         int ret;
2729
2730         /* we got a response from our child process so we can abort the
2731            timeout.
2732         */
2733         talloc_free(state->te);
2734         state->te = NULL;
2735
2736         ret = read(state->fd[0], &c, 1);
2737         if (ret != 1 || c != RECLOCK_OK) {
2738                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2739                 state->status = RECLOCK_FAILED;
2740
2741                 return;
2742         }
2743
2744         state->status = RECLOCK_OK;
2745         return;
2746 }
2747
2748 static int check_recovery_lock(struct ctdb_context *ctdb)
2749 {
2750         int ret;
2751         struct ctdb_check_reclock_state *state;
2752         pid_t parent = getpid();
2753
2754         if (ctdb->recovery_lock_fd == -1) {
2755                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2756                 return -1;
2757         }
2758
2759         state = talloc(ctdb, struct ctdb_check_reclock_state);
2760         CTDB_NO_MEMORY(ctdb, state);
2761
2762         state->ctdb = ctdb;
2763         state->start_time = timeval_current();
2764         state->status = RECLOCK_CHECKING;
2765         state->fd[0] = -1;
2766         state->fd[1] = -1;
2767
2768         ret = pipe(state->fd);
2769         if (ret != 0) {
2770                 talloc_free(state);
2771                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2772                 return -1;
2773         }
2774
2775         state->child = ctdb_fork(ctdb);
2776         if (state->child == (pid_t)-1) {
2777                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2778                 close(state->fd[0]);
2779                 state->fd[0] = -1;
2780                 close(state->fd[1]);
2781                 state->fd[1] = -1;
2782                 talloc_free(state);
2783                 return -1;
2784         }
2785
2786         if (state->child == 0) {
2787                 char cc = RECLOCK_OK;
2788                 close(state->fd[0]);
2789                 state->fd[0] = -1;
2790
2791                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
2792                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2793                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2794                         cc = RECLOCK_FAILED;
2795                 }
2796
2797                 write(state->fd[1], &cc, 1);
2798                 /* make sure we die when our parent dies */
2799                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2800                         sleep(5);
2801                         write(state->fd[1], &cc, 1);
2802                 }
2803                 _exit(0);
2804         }
2805         close(state->fd[1]);
2806         state->fd[1] = -1;
2807         set_close_on_exec(state->fd[0]);
2808
2809         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2810
2811         talloc_set_destructor(state, check_reclock_destructor);
2812
2813         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2814                                     ctdb_check_reclock_timeout, state);
2815         if (state->te == NULL) {
2816                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2817                 talloc_free(state);
2818                 return -1;
2819         }
2820
2821         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2822                                 EVENT_FD_READ,
2823                                 reclock_child_handler,
2824                                 (void *)state);
2825
2826         if (state->fde == NULL) {
2827                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2828                 talloc_free(state);
2829                 return -1;
2830         }
2831         tevent_fd_set_auto_close(state->fde);
2832
2833         while (state->status == RECLOCK_CHECKING) {
2834                 event_loop_once(ctdb->ev);
2835         }
2836
2837         if (state->status == RECLOCK_FAILED) {
2838                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2839                 close(ctdb->recovery_lock_fd);
2840                 ctdb->recovery_lock_fd = -1;
2841                 talloc_free(state);
2842                 return -1;
2843         }
2844
2845         talloc_free(state);
2846         return 0;
2847 }
2848
2849 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2850 {
2851         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2852         const char *reclockfile;
2853
2854         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2855                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2856                 talloc_free(tmp_ctx);
2857                 return -1;      
2858         }
2859
2860         if (reclockfile == NULL) {
2861                 if (ctdb->recovery_lock_file != NULL) {
2862                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2863                         talloc_free(ctdb->recovery_lock_file);
2864                         ctdb->recovery_lock_file = NULL;
2865                         if (ctdb->recovery_lock_fd != -1) {
2866                                 close(ctdb->recovery_lock_fd);
2867                                 ctdb->recovery_lock_fd = -1;
2868                         }
2869                 }
2870                 ctdb->tunable.verify_recovery_lock = 0;
2871                 talloc_free(tmp_ctx);
2872                 return 0;
2873         }
2874
2875         if (ctdb->recovery_lock_file == NULL) {
2876                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2877                 if (ctdb->recovery_lock_fd != -1) {
2878                         close(ctdb->recovery_lock_fd);
2879                         ctdb->recovery_lock_fd = -1;
2880                 }
2881                 talloc_free(tmp_ctx);
2882                 return 0;
2883         }
2884
2885
2886         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2887                 talloc_free(tmp_ctx);
2888                 return 0;
2889         }
2890
2891         talloc_free(ctdb->recovery_lock_file);
2892         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2893         ctdb->tunable.verify_recovery_lock = 0;
2894         if (ctdb->recovery_lock_fd != -1) {
2895                 close(ctdb->recovery_lock_fd);
2896                 ctdb->recovery_lock_fd = -1;
2897         }
2898
2899         talloc_free(tmp_ctx);
2900         return 0;
2901 }
2902
2903 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2904                       TALLOC_CTX *mem_ctx)
2905 {
2906         uint32_t pnn;
2907         struct ctdb_node_map *nodemap=NULL;
2908         struct ctdb_node_map *recmaster_nodemap=NULL;
2909         struct ctdb_node_map **remote_nodemaps=NULL;
2910         struct ctdb_vnn_map *vnnmap=NULL;
2911         struct ctdb_vnn_map *remote_vnnmap=NULL;
2912         int32_t debug_level;
2913         int i, j, ret;
2914
2915
2916
2917         /* verify that the main daemon is still running */
2918         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2919                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2920                 exit(-1);
2921         }
2922
2923         /* ping the local daemon to tell it we are alive */
2924         ctdb_ctrl_recd_ping(ctdb);
2925
2926         if (rec->election_timeout) {
2927                 /* an election is in progress */
2928                 return;
2929         }
2930
2931         /* read the debug level from the parent and update locally */
2932         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2933         if (ret !=0) {
2934                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2935                 return;
2936         }
2937         LogLevel = debug_level;
2938
2939
2940         /* We must check if we need to ban a node here but we want to do this
2941            as early as possible so we dont wait until we have pulled the node
2942            map from the local node. thats why we have the hardcoded value 20
2943         */
2944         for (i=0; i<ctdb->num_nodes; i++) {
2945                 struct ctdb_banning_state *ban_state;
2946
2947                 if (ctdb->nodes[i]->ban_state == NULL) {
2948                         continue;
2949                 }
2950                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2951                 if (ban_state->count < 20) {
2952                         continue;
2953                 }
2954                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2955                         ctdb->nodes[i]->pnn, ban_state->count,
2956                         ctdb->tunable.recovery_ban_period));
2957                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2958                 ban_state->count = 0;
2959         }
2960
2961         /* get relevant tunables */
2962         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2963         if (ret != 0) {
2964                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2965                 return;
2966         }
2967
2968         /* get the current recovery lock file from the server */
2969         if (update_recovery_lock_file(ctdb) != 0) {
2970                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2971                 return;
2972         }
2973
2974         /* Make sure that if recovery lock verification becomes disabled when
2975            we close the file
2976         */
2977         if (ctdb->tunable.verify_recovery_lock == 0) {
2978                 if (ctdb->recovery_lock_fd != -1) {
2979                         close(ctdb->recovery_lock_fd);
2980                         ctdb->recovery_lock_fd = -1;
2981                 }
2982         }
2983
2984         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2985         if (pnn == (uint32_t)-1) {
2986                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2987                 return;
2988         }
2989
2990         /* get the vnnmap */
2991         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2992         if (ret != 0) {
2993                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2994                 return;
2995         }
2996
2997
2998         /* get number of nodes */
2999         if (rec->nodemap) {
3000                 talloc_free(rec->nodemap);
3001                 rec->nodemap = NULL;
3002                 nodemap=NULL;
3003         }
3004         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3005         if (ret != 0) {
3006                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3007                 return;
3008         }
3009         nodemap = rec->nodemap;
3010
3011         /* check which node is the recovery master */
3012         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3013         if (ret != 0) {
3014                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3015                 return;
3016         }
3017
3018         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3019         if (rec->recmaster != pnn) {
3020                 if (rec->ip_reallocate_ctx != NULL) {
3021                         talloc_free(rec->ip_reallocate_ctx);
3022                         rec->ip_reallocate_ctx = NULL;
3023                         rec->reallocate_callers = NULL;
3024                 }
3025         }
3026
3027         if (rec->recmaster == (uint32_t)-1) {
3028                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3029                 force_election(rec, pnn, nodemap);
3030                 return;
3031         }
3032
3033
3034         /* if the local daemon is STOPPED, we verify that the databases are
3035            also frozen and thet the recmode is set to active 
3036         */
3037         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3038                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3039                 if (ret != 0) {
3040                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3041                 }
3042                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3043                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3044
3045                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3046                         if (ret != 0) {
3047                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3048                                 return;
3049                         }
3050                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3051                         if (ret != 0) {
3052                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3053
3054                                 return;
3055                         }
3056                         return;
3057                 }
3058         }
3059         /* If the local node is stopped, verify we are not the recmaster 
3060            and yield this role if so
3061         */
3062         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3063                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3064                 force_election(rec, pnn, nodemap);
3065                 return;
3066         }
3067         
3068         /* check that we (recovery daemon) and the local ctdb daemon
3069            agrees on whether we are banned or not
3070         */
3071 //qqq
3072
3073         /* remember our own node flags */
3074         rec->node_flags = nodemap->nodes[pnn].flags;
3075
3076         /* count how many active nodes there are */
3077         rec->num_active    = 0;
3078         rec->num_connected = 0;
3079         for (i=0; i<nodemap->num; i++) {
3080                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3081                         rec->num_active++;
3082                 }
3083                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3084                         rec->num_connected++;
3085                 }
3086         }
3087
3088
3089         /* verify that the recmaster node is still active */
3090         for (j=0; j<nodemap->num; j++) {
3091                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3092                         break;
3093                 }
3094         }
3095
3096         if (j == nodemap->num) {
3097                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3098                 force_election(rec, pnn, nodemap);
3099                 return;
3100         }
3101
3102         /* if recovery master is disconnected we must elect a new recmaster */
3103         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3104                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3105                 force_election(rec, pnn, nodemap);
3106                 return;
3107         }
3108
3109         /* grap the nodemap from the recovery master to check if it is banned */
3110         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3111                                    mem_ctx, &recmaster_nodemap);
3112         if (ret != 0) {
3113                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3114                           nodemap->nodes[j].pnn));
3115                 return;
3116         }
3117
3118
3119         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3120                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3121                 force_election(rec, pnn, nodemap);
3122                 return;
3123         }
3124
3125
3126         /* verify that we have all ip addresses we should have and we dont
3127          * have addresses we shouldnt have.
3128          */ 
3129         if (ctdb->tunable.disable_ip_failover == 0) {
3130                 if (rec->ip_check_disable_ctx == NULL) {
3131                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3132                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3133                         }
3134                 }
3135         }
3136
3137
3138         /* if we are not the recmaster then we do not need to check
3139            if recovery is needed
3140          */
3141         if (pnn != rec->recmaster) {
3142                 return;
3143         }
3144
3145
3146         /* ensure our local copies of flags are right */
3147         ret = update_local_flags(rec, nodemap);
3148         if (ret == MONITOR_ELECTION_NEEDED) {
3149                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3150                 force_election(rec, pnn, nodemap);
3151                 return;
3152         }
3153         if (ret != MONITOR_OK) {
3154                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3155                 return;
3156         }
3157
3158         if (ctdb->num_nodes != nodemap->num) {
3159                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3160                 reload_nodes_file(ctdb);
3161                 return;
3162         }
3163
3164         /* verify that all active nodes agree that we are the recmaster */
3165         switch (verify_recmaster(rec, nodemap, pnn)) {
3166         case MONITOR_RECOVERY_NEEDED:
3167                 /* can not happen */
3168                 return;
3169         case MONITOR_ELECTION_NEEDED:
3170                 force_election(rec, pnn, nodemap);
3171                 return;
3172         case MONITOR_OK:
3173                 break;
3174         case MONITOR_FAILED:
3175                 return;
3176         }
3177
3178
3179         if (rec->need_recovery) {
3180                 /* a previous recovery didn't finish */
3181                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3182                 return;
3183         }
3184
3185         /* verify that all active nodes are in normal mode 
3186            and not in recovery mode 
3187         */
3188         switch (verify_recmode(ctdb, nodemap)) {
3189         case MONITOR_RECOVERY_NEEDED:
3190                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3191                 return;
3192         case MONITOR_FAILED:
3193                 return;
3194         case MONITOR_ELECTION_NEEDED:
3195                 /* can not happen */
3196         case MONITOR_OK:
3197                 break;
3198         }
3199
3200
3201         if (ctdb->tunable.verify_recovery_lock != 0) {
3202                 /* we should have the reclock - check its not stale */
3203                 ret = check_recovery_lock(ctdb);
3204                 if (ret != 0) {
3205                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3206                         ctdb_set_culprit(rec, ctdb->pnn);
3207                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3208                         return;
3209                 }
3210         }
3211
3212         /* if there are takeovers requested, perform it and notify the waiters */
3213         if (rec->reallocate_callers) {
3214                 process_ipreallocate_requests(ctdb, rec);
3215         }
3216
3217         /* get the nodemap for all active remote nodes
3218          */
3219         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3220         if (remote_nodemaps == NULL) {
3221                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3222                 return;
3223         }
3224         for(i=0; i<nodemap->num; i++) {
3225                 remote_nodemaps[i] = NULL;
3226         }
3227         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3228                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3229                 return;
3230         } 
3231
3232         /* verify that all other nodes have the same nodemap as we have
3233         */
3234         for (j=0; j<nodemap->num; j++) {
3235                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3236                         continue;
3237                 }
3238
3239                 if (remote_nodemaps[j] == NULL) {
3240                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3241                         ctdb_set_culprit(rec, j);
3242
3243                         return;
3244                 }
3245
3246                 /* if the nodes disagree on how many nodes there are
3247                    then this is a good reason to try recovery
3248                  */
3249                 if (remote_nodemaps[j]->num != nodemap->num) {
3250                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3251                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3252                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3253                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3254                         return;
3255                 }
3256
3257                 /* if the nodes disagree on which nodes exist and are
3258                    active, then that is also a good reason to do recovery
3259                  */
3260                 for (i=0;i<nodemap->num;i++) {
3261                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3262                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3263                                           nodemap->nodes[j].pnn, i, 
3264                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3265                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3266                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3267                                             vnnmap);
3268                                 return;
3269                         }
3270                 }
3271
3272                 /* verify the flags are consistent
3273                 */
3274                 for (i=0; i<nodemap->num; i++) {
3275                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3276                                 continue;
3277                         }
3278                         
3279                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3280                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3281                                   nodemap->nodes[j].pnn, 
3282                                   nodemap->nodes[i].pnn, 
3283                                   remote_nodemaps[j]->nodes[i].flags,
3284                                   nodemap->nodes[j].flags));
3285                                 if (i == j) {
3286                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3287                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3288                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3289                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3290                                                     vnnmap);
3291                                         return;
3292                                 } else {
3293                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3294                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3295                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3296                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3297                                                     vnnmap);
3298                                         return;
3299                                 }
3300                         }
3301                 }
3302         }
3303
3304
3305         /* there better be the same number of lmasters in the vnn map
3306            as there are active nodes or we will have to do a recovery
3307          */
3308         if (vnnmap->size != rec->num_active) {
3309                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3310                           vnnmap->size, rec->num_active));
3311                 ctdb_set_culprit(rec, ctdb->pnn);
3312                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3313                 return;
3314         }
3315
3316         /* verify that all active nodes in the nodemap also exist in 
3317            the vnnmap.
3318          */
3319         for (j=0; j<nodemap->num; j++) {
3320                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3321                         continue;
3322                 }
3323                 if (nodemap->nodes[j].pnn == pnn) {
3324                         continue;
3325                 }
3326
3327                 for (i=0; i<vnnmap->size; i++) {
3328                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3329                                 break;
3330                         }
3331                 }
3332                 if (i == vnnmap->size) {
3333                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3334                                   nodemap->nodes[j].pnn));
3335                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3336                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3337                         return;
3338                 }
3339         }
3340
3341         
3342         /* verify that all other nodes have the same vnnmap
3343            and are from the same generation
3344          */
3345         for (j=0; j<nodemap->num; j++) {
3346                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3347                         continue;
3348                 }
3349                 if (nodemap->nodes[j].pnn == pnn) {
3350                         continue;
3351                 }
3352
3353                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3354                                           mem_ctx, &remote_vnnmap);
3355                 if (ret != 0) {
3356                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3357                                   nodemap->nodes[j].pnn));
3358                         return;
3359                 }
3360
3361                 /* verify the vnnmap generation is the same */
3362                 if (vnnmap->generation != remote_vnnmap->generation) {
3363                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3364                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3365                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3366                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3367                         return;
3368                 }
3369
3370                 /* verify the vnnmap size is the same */
3371                 if (vnnmap->size != remote_vnnmap->size) {
3372                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3373                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3374                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3375                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3376                         return;
3377                 }
3378
3379                 /* verify the vnnmap is the same */
3380                 for (i=0;i<vnnmap->size;i++) {
3381                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3382                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3383                                           nodemap->nodes[j].pnn));
3384                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3385                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3386                                             vnnmap);
3387                                 return;
3388                         }
3389                 }
3390         }
3391
3392         /* we might need to change who has what IP assigned */
3393         if (rec->need_takeover_run) {
3394                 uint32_t culprit = (uint32_t)-1;
3395
3396                 rec->need_takeover_run = false;
3397
3398                 /* update the list of public ips that a node can handle for
3399                    all connected nodes
3400                 */
3401                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3402                 if (ret != 0) {
3403                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3404                                          culprit));
3405                         rec->need_takeover_run = true;
3406                         return;
3407                 }
3408
3409                 /* execute the "startrecovery" event script on all nodes */
3410                 ret = run_startrecovery_eventscript(rec, nodemap);
3411                 if (ret!=0) {
3412                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3413                         ctdb_set_culprit(rec, ctdb->pnn);
3414                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3415                         return;
3416                 }
3417
3418                 ret = ctdb_takeover_run(ctdb, nodemap);
3419                 if (ret != 0) {
3420                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Try again later\n"));
3421                         return;
3422                 }
3423
3424                 /* execute the "recovered" event script on all nodes */
3425                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3426 #if 0
3427 // we cant check whether the event completed successfully
3428 // since this script WILL fail if the node is in recovery mode
3429 // and if that race happens, the code here would just cause a second
3430 // cascading recovery.
3431                 if (ret!=0) {
3432                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3433                         ctdb_set_culprit(rec, ctdb->pnn);
3434                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3435                 }
3436 #endif
3437         }
3438 }
3439
3440 /*
3441   the main monitoring loop
3442  */
3443 static void monitor_cluster(struct ctdb_context *ctdb)
3444 {
3445         struct ctdb_recoverd *rec;
3446
3447         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3448
3449         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3450         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3451
3452         rec->ctdb = ctdb;
3453
3454         rec->priority_time = timeval_current();
3455
3456         /* register a message port for sending memory dumps */
3457         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3458
3459         /* register a message port for recovery elections */
3460         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3461
3462         /* when nodes are disabled/enabled */
3463         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3464
3465         /* when we are asked to puch out a flag change */
3466         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3467
3468         /* register a message port for vacuum fetch */
3469         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3470
3471         /* register a message port for reloadnodes  */
3472         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3473
3474         /* register a message port for performing a takeover run */
3475         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3476
3477         /* register a message port for disabling the ip check for a short while */
3478         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3479
3480         /* register a message port for updating the recovery daemons node assignment for an ip */
3481         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3482
3483         for (;;) {
3484                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3485                 struct timeval start;
3486                 double elapsed;
3487
3488                 if (!mem_ctx) {
3489                         DEBUG(DEBUG_CRIT,(__location__
3490                                           " Failed to create temp context\n"));
3491                         exit(-1);
3492                 }
3493
3494                 start = timeval_current();
3495                 main_loop(ctdb, rec, mem_ctx);
3496                 talloc_free(mem_ctx);
3497
3498                 /* we only check for recovery once every second */
3499                 elapsed = timeval_elapsed(&start);
3500                 if (elapsed < ctdb->tunable.recover_interval) {
3501                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3502                                           - elapsed);
3503                 }
3504         }
3505 }
3506
3507 /*
3508   event handler for when the main ctdbd dies
3509  */
3510 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3511                                  uint16_t flags, void *private_data)
3512 {
3513         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3514         _exit(1);
3515 }
3516
3517 /*
3518   called regularly to verify that the recovery daemon is still running
3519  */
3520 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3521                               struct timeval yt, void *p)
3522 {
3523         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3524
3525         if (kill(ctdb->recoverd_pid, 0) != 0) {
3526                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3527
3528                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
3529                                 ctdb_restart_recd, ctdb);
3530
3531                 return;
3532         }
3533
3534         event_add_timed(ctdb->ev, ctdb, 
3535                         timeval_current_ofs(30, 0),
3536                         ctdb_check_recd, ctdb);
3537 }
3538
3539 static void recd_sig_child_handler(struct event_context *ev,
3540         struct signal_event *se, int signum, int count,
3541         void *dont_care, 
3542         void *private_data)
3543 {
3544 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3545         int status;
3546         pid_t pid = -1;
3547
3548         while (pid != 0) {
3549                 pid = waitpid(-1, &status, WNOHANG);
3550                 if (pid == -1) {
3551                         if (errno != ECHILD) {
3552                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3553                         }
3554                         return;
3555                 }
3556                 if (pid > 0) {
3557                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3558                 }
3559         }
3560 }
3561
3562 /*
3563   startup the recovery daemon as a child of the main ctdb daemon
3564  */
3565 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3566 {
3567         int fd[2];
3568         struct signal_event *se;
3569         struct tevent_fd *fde;
3570
3571         if (pipe(fd) != 0) {
3572                 return -1;
3573         }
3574
3575         ctdb->ctdbd_pid = getpid();
3576
3577         ctdb->recoverd_pid = fork();
3578         if (ctdb->recoverd_pid == -1) {
3579                 return -1;
3580         }
3581         
3582         if (ctdb->recoverd_pid != 0) {
3583                 close(fd[0]);
3584                 event_add_timed(ctdb->ev, ctdb, 
3585                                 timeval_current_ofs(30, 0),
3586                                 ctdb_check_recd, ctdb);
3587                 return 0;
3588         }
3589
3590         close(fd[1]);
3591
3592         srandom(getpid() ^ time(NULL));
3593
3594         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
3595                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3596                 exit(1);
3597         }
3598
3599         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3600
3601         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
3602                      ctdb_recoverd_parent, &fd[0]);     
3603         tevent_fd_set_auto_close(fde);
3604
3605         /* set up a handler to pick up sigchld */
3606         se = event_add_signal(ctdb->ev, ctdb,
3607                                      SIGCHLD, 0,
3608                                      recd_sig_child_handler,
3609                                      ctdb);
3610         if (se == NULL) {
3611                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3612                 exit(1);
3613         }
3614
3615         monitor_cluster(ctdb);
3616
3617         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3618         return -1;
3619 }
3620
3621 /*
3622   shutdown the recovery daemon
3623  */
3624 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3625 {
3626         if (ctdb->recoverd_pid == 0) {
3627                 return;
3628         }
3629
3630         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3631         kill(ctdb->recoverd_pid, SIGTERM);
3632 }
3633
3634 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
3635                        struct timeval t, void *private_data)
3636 {
3637         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3638
3639         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3640         ctdb_stop_recoverd(ctdb);
3641         ctdb_start_recoverd(ctdb);
3642 }